From 4bb4ebdf823819ea05bcb976dd5adf32ed88efeb Mon Sep 17 00:00:00 2001 From: Patrick Robertson Date: Tue, 21 Jan 2025 16:36:45 +0100 Subject: [PATCH] Further cleanup, abstracts 'dropins' out into generic files --- .../archivers/generic_archiver/bluesky.py | 132 ++++++------ .../archivers/generic_archiver/dropin.py | 58 +++++ .../generic_archiver/generic_archiver.py | 201 +++++++++--------- .../archivers/generic_archiver/truth.py | 73 ++++--- .../archivers/generic_archiver/twitter.py | 108 +++++----- src/auto_archiver/feeders/csv_feeder.py | 41 ++++ src/auto_archiver/utils/__init__.py | 2 +- 7 files changed, 371 insertions(+), 244 deletions(-) create mode 100644 src/auto_archiver/archivers/generic_archiver/dropin.py create mode 100644 src/auto_archiver/feeders/csv_feeder.py diff --git a/src/auto_archiver/archivers/generic_archiver/bluesky.py b/src/auto_archiver/archivers/generic_archiver/bluesky.py index 684124b..821d777 100644 --- a/src/auto_archiver/archivers/generic_archiver/bluesky.py +++ b/src/auto_archiver/archivers/generic_archiver/bluesky.py @@ -7,69 +7,75 @@ from loguru import logger from auto_archiver.core.context import ArchivingContext from auto_archiver.archivers.archiver import Archiver from auto_archiver.core.metadata import Metadata, Media +from .dropin import GenericDropin, InfoExtractor + +class Bluesky(GenericDropin): + + def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata: + result = Metadata() + result.set_url(url) + result.set_title(post["record"]["text"]) + result.set_timestamp(post["record"]["createdAt"]) + for k, v in self._get_post_data(post).items(): + if v: result.set(k, v) + + # download if embeds present (1 video XOR >=1 images) + for media in self._download_bsky_embeds(post, archiver): + result.add_media(media) + logger.debug(f"Downloaded {len(result.media)} media files") + + return result + + def extract_post(self, url: str, ie_instance: InfoExtractor) -> dict: + handle, video_id = ie_instance._match_valid_url(url).group('handle', 'id') + return ie_instance._extract_post(handle=handle, post_id=video_id) + + def _download_bsky_embeds(self, post: dict, archiver: Archiver) -> list[Media]: + """ + Iterates over image(s) or video in a Bluesky post and downloads them + """ + media = [] + embed = post.get("record", {}).get("embed", {}) + image_medias = embed.get("images", []) + embed.get("media", {}).get("images", []) + video_medias = [e for e in [embed.get("video"), embed.get("media", {}).get("video")] if e] + + media_url = "https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={}&did={}" + for image_media in image_medias: + url = media_url.format(image_media['image']['ref']['$link'], post['author']['did']) + image_media = archiver.download_from_url(url) + media.append(image_media) + for video_media in video_medias: + url = media_url.format(video_media['ref']['$link'], post['author']['did']) + video_media = archiver.download_from_url(url) + media.append(video_media) + return media -def create_metadata(post: dict, archiver: Archiver, url: str) -> Metadata: - result = Metadata() - result.set_url(url) - result.set_title(post["record"]["text"]) - result.set_timestamp(post["record"]["createdAt"]) - for k, v in _get_post_data(post).items(): - if v: result.set(k, v) + def _get_post_data(self, post: dict) -> dict: + """ + Extracts relevant information returned by the .getPostThread api call (excluding text/created_at): author, mentions, tags, links. + """ + author = post["author"] + if "labels" in author and not author["labels"]: + del author["labels"] + if "associated" in author: + del author["associated"] - # download if embeds present (1 video XOR >=1 images) - for media in _download_bsky_embeds(post, archiver): - result.add_media(media) - logger.debug(f"Downloaded {len(result.media)} media files") - - return result - -def _download_bsky_embeds(post: dict, archiver: Archiver) -> list[Media]: - """ - Iterates over image(s) or video in a Bluesky post and downloads them - """ - media = [] - embed = post.get("record", {}).get("embed", {}) - image_medias = embed.get("images", []) + embed.get("media", {}).get("images", []) - video_medias = [e for e in [embed.get("video"), embed.get("media", {}).get("video")] if e] - - media_url = "https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={}&did={}" - for image_media in image_medias: - url = media_url.format(image_media['image']['ref']['$link'], post['author']['did']) - image_media = archiver.download_from_url(url) - media.append(image_media) - for video_media in video_medias: - url = media_url.format(video_media['ref']['$link'], post['author']['did']) - video_media = archiver.download_from_url(url) - media.append(video_media) - return media - - -def _get_post_data(post: dict) -> dict: - """ - Extracts relevant information returned by the .getPostThread api call (excluding text/created_at): author, mentions, tags, links. - """ - author = post["author"] - if "labels" in author and not author["labels"]: - del author["labels"] - if "associated" in author: - del author["associated"] - - mentions, tags, links = [], [], [] - facets = post.get("record", {}).get("facets", []) - for f in facets: - for feature in f["features"]: - if feature["$type"] == "app.bsky.richtext.facet#mention": - mentions.append(feature["did"]) - elif feature["$type"] == "app.bsky.richtext.facet#tag": - tags.append(feature["tag"]) - elif feature["$type"] == "app.bsky.richtext.facet#link": - links.append(feature["uri"]) - res = {"author": author} - if mentions: - res["mentions"] = mentions - if tags: - res["tags"] = tags - if links: - res["links"] = links - return res \ No newline at end of file + mentions, tags, links = [], [], [] + facets = post.get("record", {}).get("facets", []) + for f in facets: + for feature in f["features"]: + if feature["$type"] == "app.bsky.richtext.facet#mention": + mentions.append(feature["did"]) + elif feature["$type"] == "app.bsky.richtext.facet#tag": + tags.append(feature["tag"]) + elif feature["$type"] == "app.bsky.richtext.facet#link": + links.append(feature["uri"]) + res = {"author": author} + if mentions: + res["mentions"] = mentions + if tags: + res["tags"] = tags + if links: + res["links"] = links + return res \ No newline at end of file diff --git a/src/auto_archiver/archivers/generic_archiver/dropin.py b/src/auto_archiver/archivers/generic_archiver/dropin.py new file mode 100644 index 0000000..37f3faf --- /dev/null +++ b/src/auto_archiver/archivers/generic_archiver/dropin.py @@ -0,0 +1,58 @@ +from yt_dlp.extractor.common import InfoExtractor +from auto_archiver.core.metadata import Metadata +from auto_archiver.archivers.archiver import Archiver + +class GenericDropin: + """Base class for dropins for the generic extractor. + + In many instances, an extractor will exist in ytdlp, but it will only process videos. + Dropins can be created and used to make use of the already-written private code of a + specific extractor from ytdlp. + + The dropin should be able to handle the following methods: + + - `get_post_data`: This method should be able to extract the post data from the url and return it as a dict. + - `create_metadata`: This method should be able to create a Metadata object from a post dict. + + Optional methods include: + + - `skip_ytdlp_download`: If you want to skip the ytdlp 'download' method all together, and do your own, then return True for this method. + This is useful in cases where ytdlp might not work properly for all of your posts + - `keys_to_clean`: for the generic 'video_data' created by ytdlp (for video URLs), any additional fields you would like to clean out of the data before storing in metadata + + + """ + + def extract_post(self, url: str, ie_instance: InfoExtractor): + """ + This method should return the post data from the url. + """ + raise NotImplementedError("This method should be implemented in the subclass") + + + def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata: + """ + This method should create a Metadata object from the post data. + """ + raise NotImplementedError("This method should be implemented in the subclass") + + + def skip_ytdlp_download(self, url: str, ie_instance: InfoExtractor): + """ + This method should return True if you want to skip the ytdlp download method. + """ + return False + + def keys_to_clean(self, video_data: dict, info_extractor: InfoExtractor): + """ + This method should return a list of strings (keys) to clean from the video_data dict. + + E.g. ["uploader", "uploader_id", "tiktok_specific_field"] + """ + return [] + + def download_additional_media(self, video_data: dict, info_extractor: InfoExtractor, metadata: Metadata): + """ + This method should download any additional media from the post. + """ + return metadata \ No newline at end of file diff --git a/src/auto_archiver/archivers/generic_archiver/generic_archiver.py b/src/auto_archiver/archivers/generic_archiver/generic_archiver.py index 41f1314..511c7e4 100644 --- a/src/auto_archiver/archivers/generic_archiver/generic_archiver.py +++ b/src/auto_archiver/archivers/generic_archiver/generic_archiver.py @@ -1,16 +1,16 @@ import datetime, os, yt_dlp, pysubs2 +import importlib from typing import Type from yt_dlp.extractor.common import InfoExtractor from loguru import logger -from . import bluesky, twitter, truth from auto_archiver.archivers.archiver import Archiver from ...core import Metadata, Media, ArchivingContext - class GenericArchiver(Archiver): name = "youtubedl_archiver" #left as is for backwards compat + _dropins = {} def __init__(self, config: dict) -> None: super().__init__(config) @@ -22,23 +22,22 @@ class GenericArchiver(Archiver): self.allow_playlist = bool(self.allow_playlist) self.max_downloads = self.max_downloads - @staticmethod - def configs() -> dict: - return { - "facebook_cookie": {"default": None, "help": "optional facebook cookie to have more access to content, from browser, looks like 'cookie: datr= xxxx'"}, - "subtitles": {"default": True, "help": "download subtitles if available"}, - "comments": {"default": False, "help": "download all comments if available, may lead to large metadata"}, - "livestreams": {"default": False, "help": "if set, will download live streams, otherwise will skip them; see --max-filesize for more control"}, - "live_from_start": {"default": False, "help": "if set, will download live streams from their earliest available moment, otherwise starts now."}, - "proxy": {"default": "", "help": "http/socks (https seems to not work atm) proxy to use for the webdriver, eg https://proxy-user:password@proxy-ip:port"}, - "end_means_success": {"default": True, "help": "if True, any archived content will mean a 'success', if False this archiver will not return a 'success' stage; this is useful for cases when the yt-dlp will archive a video but ignore other types of content like images or text only pages that the subsequent archivers can retrieve."}, - 'allow_playlist': {"default": False, "help": "If True will also download playlists, set to False if the expectation is to download a single video."}, - "max_downloads": {"default": "inf", "help": "Use to limit the number of videos to download when a channel or long page is being extracted. 'inf' means no limit."}, - "cookies_from_browser": {"default": None, "help": "optional browser for ytdl to extract cookies from, can be one of: brave, chrome, chromium, edge, firefox, opera, safari, vivaldi, whale"}, - "cookie_file": {"default": None, "help": "optional cookie file to use for Youtube, see instructions here on how to export from your browser: https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp"}, - } + + def suitable_extractors(self, url: str) -> list[str]: + """ + Returns a list of valid extractors for the given URL""" + for info_extractor in yt_dlp.YoutubeDL()._ies.values(): + if info_extractor.suitable(url) and info_extractor.working(): + yield info_extractor + + def suitable(self, url: str) -> bool: + """ + Checks for valid URLs out of all ytdlp extractors. + Returns False for the GenericIE, which as labelled by yt-dlp: 'Generic downloader that works on some sites' + """ + return any(self.suitable_extractors(url)) - def download_additional_media(self, extractor_key: str, video_data: dict, metadata: Metadata) -> Metadata: + def download_additional_media(self, video_data: dict, info_extractor: InfoExtractor, metadata: Metadata) -> Metadata: """ Downloads additional media like images, comments, subtitles, etc. @@ -56,11 +55,18 @@ class GenericArchiver(Archiver): except Exception as e: logger.error(f"Error downloading cover image {thumbnail_url}: {e}") + dropin = self.dropin_for_extractor(info_extractor) + if dropin: + try: + metadata = dropin.download_additional_media(video_data, info_extractor, metadata) + except AttributeError: + pass + return metadata - def keys_to_clean(self, extractor_key: str, video_data: dict) -> dict: + def keys_to_clean(self, info_extractor: InfoExtractor, video_data: dict) -> dict: """ - Clean up the video data to make it more readable and remove unnecessary keys that ytdlp adds + Clean up the ytdlp generic video data to make it more readable and remove unnecessary keys that ytdlp adds """ base_keys = ['formats', 'thumbnail', 'display_id', 'epoch', 'requested_downloads', @@ -71,23 +77,23 @@ class GenericArchiver(Archiver): 'channel_id', 'subtitles', 'tbr', 'url', 'original_url', 'automatic_captions', 'playable_in_embed', 'live_status', '_format_sort_fields', 'chapters', 'requested_formats', 'format_note', 'audio_channels', 'asr', 'fps', 'was_live', 'is_live', 'heatmap', 'age_limit', 'stretched_ratio'] - if extractor_key == 'TikTok': - # Tiktok: only has videos so a valid ytdlp `video_data` object is returned. Base keys are enough - return base_keys + [] - elif extractor_key == "Bluesky": - # bluesky API response for non video URLs is already clean, nothing to add - return base_keys + [] - + dropin = self.dropin_for_extractor(info_extractor) + if dropin: + try: + base_keys += dropin.keys_to_clean(video_data, info_extractor) + except AttributeError: + pass + return base_keys - def add_metadata(self, extractor_key: str, video_data: dict, url:str, result: Metadata) -> Metadata: + def add_metadata(self, video_data: dict, info_extractor: InfoExtractor, url:str, result: Metadata) -> Metadata: """ - Creates a Metadata object from the give video_data + Creates a Metadata object from the given video_data """ # first add the media - result = self.download_additional_media(extractor_key, video_data, result) + result = self.download_additional_media(video_data, info_extractor, result) # keep both 'title' and 'fulltitle', but prefer 'title', falling back to 'fulltitle' if it doesn't exist result.set_title(video_data.pop('title', video_data.pop('fulltitle', ""))) @@ -110,7 +116,7 @@ class GenericArchiver(Archiver): result.set("upload_date", upload_date) # then clean away any keys we don't want - for clean_key in self.keys_to_clean(extractor_key, video_data): + for clean_key in self.keys_to_clean(info_extractor, video_data): video_data.pop(clean_key, None) # then add the rest of the video data @@ -119,35 +125,6 @@ class GenericArchiver(Archiver): result.set(k, v) return result - - def suitable_extractors(self, url: str) -> list[str]: - """ - Returns a list of valid extractors for the given URL""" - for info_extractor in yt_dlp.YoutubeDL()._ies.values(): - if info_extractor.suitable(url) and info_extractor.working(): - yield info_extractor - - def suitable(self, url: str) -> bool: - """ - Checks for valid URLs out of all ytdlp extractors. - Returns False for the GenericIE, which as labelled by yt-dlp: 'Generic downloader that works on some sites' - """ - return any(self.suitable_extractors(url)) - - def create_metadata_for_post(self, info_extractor: InfoExtractor, post_data: dict, url: str) -> Metadata: - """ - Standardizes the output of the 'post' data from a ytdlp InfoExtractor to Metadata object. - - This is only required for platforms that don't have videos, and therefore cannot be converted into ytdlp valid 'video_data'. - In these instances, we need to use the extractor's _extract_post (or similar) method to get the post metadata, and then convert - it into a Metadata object via a platform-specific function. - """ - if info_extractor.ie_key() == 'Bluesky': - return bluesky.create_metadata(post_data, self, url) - if info_extractor.ie_key() == 'Twitter': - return twitter.create_metadata(post_data, self, url) - if info_extractor.ie_key() == 'Truth': - return truth.create_metadata(post_data, self, url) def get_metatdata_for_post(self, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL) -> Metadata: """ @@ -156,45 +133,29 @@ class GenericArchiver(Archiver): ie_instance = info_extractor(downloader=ydl) post_data = None - - if info_extractor.ie_key() == 'Bluesky': - # bluesky kwargs are handle, video_id - handle, video_id = ie_instance._match_valid_url(url).group('handle', 'id') - post_data = ie_instance._extract_post(handle=handle, post_id=video_id) - elif info_extractor.ie_key() == 'Twitter': - # twitter kwargs are tweet_id - twid = ie_instance._match_valid_url(url).group('id') - # TODO: if ytdlp PR https://github.com/yt-dlp/yt-dlp/pull/12098 is merged, change to _extract_post - post_data = ie_instance._extract_status(twid=twid) - elif info_extractor.ie_key() == 'Truth': - video_id = ie_instance._match_id(url) - truthsocial_url = f'https://truthsocial.com/api/v1/statuses/{video_id}' - post_data = ie_instance._download_json(truthsocial_url, video_id) - else: - # lame attempt at trying to get data for an unknown extractor - # TODO: test some more video platforms and see if there's any improvement to be made - try: - post_data = ie_instance._extract_post(url) - except (NotImplementedError, AttributeError) as e: - logger.debug(f"Extractor {info_extractor.ie_key()} does not support extracting post info from non-video URLs: {e}") - return False - - return self.create_metadata_for_post(ie_instance, post_data, url) + dropin = self.dropin_for_extractor(info_extractor) + if not dropin: + # TODO: add a proper link to 'how to create your own dropin' + logger.debug(f"""Could not find valid dropin for {info_extractor.IE_NAME}. + Why not try creating your own, and make sure it has a valid function called 'create_metadata'. Learn more: https://auto-archiver.readthedocs.io/en/latest/user_guidelines.html#""") + return False - def get_metadata_for_video(self, info: dict, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL) -> Metadata: + post_data = dropin.extract_post(url, ie_instance) + return dropin.create_metadata(post_data, ie_instance, self, url) + + def get_metadata_for_video(self, data: dict, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL) -> Metadata: # this time download ydl.params['getcomments'] = self.comments #TODO: for playlist or long lists of videos, how to download one at a time so they can be stored before the next one is downloaded? - info = ydl.extract_info(url, ie_key=info_extractor.ie_key(), download=True) - if "entries" in info: - entries = info.get("entries", []) + data = ydl.extract_info(url, ie_key=info_extractor.ie_key(), download=True) + if "entries" in data: + entries = data.get("entries", []) if not len(entries): logger.warning('YoutubeDLArchiver could not find any video') return False - else: entries = [info] + else: entries = [data] - extractor_key = info['extractor_key'] result = Metadata() for entry in entries: @@ -209,7 +170,7 @@ class GenericArchiver(Archiver): # read text from subtitles if enabled if self.subtitles: - for lang, val in (info.get('requested_subtitles') or {}).items(): + for lang, val in (data.get('requested_subtitles') or {}).items(): try: subs = pysubs2.load(val.get('filepath'), encoding="utf-8") text = " ".join([line.text for line in subs]) @@ -220,9 +181,49 @@ class GenericArchiver(Archiver): except Exception as e: logger.error(f"Error processing entry {entry}: {e}") - return self.add_metadata(extractor_key, info, url, result) + return self.add_metadata(data, info_extractor, url, result) + + def dropin_for_extractor(self, info_extractor: Type[InfoExtractor], additional_paths = []): + dropin_name = info_extractor.ie_key().lower() - def download_for_extractor(self, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL) -> Metadata: + if dropin_name == "generic": + # no need for a dropin for the generic extractor (?) + return None + + dropin_class_name = dropin_name.title() + def _load_dropin(dropin): + dropin_class = getattr(dropin, dropin_class_name)() + return self._dropins.setdefault(dropin_name, dropin_class) + + try: + return self._dropins[dropin_name] + except KeyError: + pass + + # TODO: user should be able to pass --dropins="/some/folder,/other/folder" as a cmd line option + # which would allow the user to override the default dropins/add their own + paths = [] + additional_paths + for path in paths: + dropin_path = os.path.join(path, f"{dropin_name}.py") + dropin_spec = importlib.util.spec_from_file_location(dropin_name, dropin_path) + if not dropin_spec: + continue + try: + dropin = importlib.util.module_from_spec(dropin_spec) + dropin_spec.loader.exec_module(dropin) + return _load_dropin(dropin) + except (FileNotFoundError, ModuleNotFoundError): + pass + + # fallback to loading the dropins within auto-archiver + try: + return _load_dropin(importlib.import_module(f".{dropin_name}", package=__package__)) + except ModuleNotFoundError: + pass + + return None + + def download_for_extractor(self, info_extractor: InfoExtractor, url: str, ydl: yt_dlp.YoutubeDL) -> Metadata: """ Tries to download the given url using the specified extractor @@ -233,19 +234,19 @@ class GenericArchiver(Archiver): ydl.params['getcomments'] = False result = False + dropin_submodule = self.dropin_for_extractor(info_extractor) + try: - if info_extractor.ie_key() == "Truth": - # the ytdlp truth extractor currently only gets the first image/video in the 'media' section, as opposed to all of them - # we don't want this - raise yt_dlp.utils.ExtractorError("Use the 'post data' method for Truth posts") + if dropin_submodule and dropin_submodule.skip_ytdlp_download(info_extractor, url): + raise Exception(f"Skipping using ytdlp to download files for {info_extractor.ie_key()}") # don't download since it can be a live stream - info = ydl.extract_info(url, ie_key=info_extractor.ie_key(), download=False) - if info.get('is_live', False) and not self.livestreams: + data = ydl.extract_info(url, ie_key=info_extractor.ie_key(), download=False) + if data.get('is_live', False) and not self.livestreams: logger.warning("Livestream detected, skipping due to 'livestreams' configuration setting") return False # it's a valid video, that the youtubdedl can download out of the box - result = self.get_metadata_for_video(info, info_extractor, url, ydl) + result = self.get_metadata_for_video(data, info_extractor, url, ydl) except Exception as e: logger.debug(f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use extractor to get post data instead') diff --git a/src/auto_archiver/archivers/generic_archiver/truth.py b/src/auto_archiver/archivers/generic_archiver/truth.py index 00551f3..bf19dce 100644 --- a/src/auto_archiver/archivers/generic_archiver/truth.py +++ b/src/auto_archiver/archivers/generic_archiver/truth.py @@ -1,39 +1,52 @@ -import datetime +from typing import Type from auto_archiver.utils import traverse_obj from auto_archiver.core.metadata import Metadata, Media from auto_archiver.archivers.archiver import Archiver +from yt_dlp.extractor.common import InfoExtractor from dateutil.parser import parse as parse_dt -def create_metadata(post: dict, archiver: Archiver, url: str) -> Metadata: - """ - Creates metadata from a truth social post - - Only used for posts that contain no media. ytdlp.TruthIE extractor can handle posts with media - - Format is: - - {'id': '109598702184774628', 'created_at': '2022-12-29T19:51:18.161Z', 'in_reply_to_id': None, 'quote_id': None, 'in_reply_to_account_id': None, 'sensitive': False, 'spoiler_text': '', 'visibility': 'public', 'language': 'en', 'uri': 'https://truthsocial.com/@bbcnewa/109598702184774628', 'url': 'https://truthsocial.com/@bbcnewa/109598702184774628', 'content': '

Pele, regarded by many as football\'s greatest ever player, has died in Brazil at the age of 82. bbc.com/sport/football/4275151

', 'account': {'id': '107905163010312793', 'username': 'bbcnewa', 'acct': 'bbcnewa', 'display_name': 'BBC News', 'locked': False, 'bot': False, 'discoverable': True, 'group': False, 'created_at': '2022-03-05T17:42:01.159Z', 'note': '

News, features and analysis by the BBC

', 'url': 'https://truthsocial.com/@bbcnewa', 'avatar': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/avatars/107/905/163/010/312/793/original/e7c07550dc22c23a.jpeg', 'avatar_static': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/avatars/107/905/163/010/312/793/original/e7c07550dc22c23a.jpeg', 'header': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/headers/107/905/163/010/312/793/original/a00eeec2b57206c7.jpeg', 'header_static': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/headers/107/905/163/010/312/793/original/a00eeec2b57206c7.jpeg', 'followers_count': 1131, 'following_count': 3, 'statuses_count': 9, 'last_status_at': '2024-11-12', 'verified': False, 'location': '', 'website': 'https://www.bbc.com/news', 'unauth_visibility': True, 'chats_onboarded': True, 'feeds_onboarded': True, 'accepting_messages': False, 'show_nonmember_group_statuses': None, 'emojis': [], 'fields': [], 'tv_onboarded': True, 'tv_account': False}, 'media_attachments': [], 'mentions': [], 'tags': [], 'card': None, 'group': None, 'quote': None, 'in_reply_to': None, 'reblog': None, 'sponsored': False, 'replies_count': 1, 'reblogs_count': 0, 'favourites_count': 2, 'favourited': False, 'reblogged': False, 'muted': False, 'pinned': False, 'bookmarked': False, 'poll': None, 'emojis': []} - """ - breakpoint() - result = Metadata() - result.set_url(url) - timestamp = post['created_at'] # format is 2022-12-29T19:51:18.161Z - result.set_timestamp(parse_dt(timestamp)) - result.set('description', post['content']) - result.set('author', post['account']['username']) +from .dropin import GenericDropin - for key in ['replies_count', 'reblogs_count', 'favourites_count', ('account', 'followers_count'), ('account', 'following_count'), ('account', 'statuses_count'), ('account', 'display_name'), 'language', 'in_reply_to_account', 'replies_count']: - if isinstance(key, tuple): - store_key = " ".join(key) - else: - store_key = key - result.set(store_key, traverse_obj(post, key)) - - # add the media - for media in post.get('media_attachments', []): - filename = archiver.download_from_url(media['url']) - result.add_media(Media(filename), id=media.get('id')) +class Truth(GenericDropin): - return result \ No newline at end of file + def extract_post(self, url, ie_instance: InfoExtractor) -> dict: + video_id = ie_instance._match_id(url) + truthsocial_url = f'https://truthsocial.com/api/v1/statuses/{video_id}' + return ie_instance._download_json(truthsocial_url, video_id) + + def skip_ytdlp_download(self, url, ie_instance: Type[InfoExtractor]) -> bool: + return True + + def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata: + """ + Creates metadata from a truth social post + + Only used for posts that contain no media. ytdlp.TruthIE extractor can handle posts with media + + Format is: + + {'id': '109598702184774628', 'created_at': '2022-12-29T19:51:18.161Z', 'in_reply_to_id': None, 'quote_id': None, 'in_reply_to_account_id': None, 'sensitive': False, 'spoiler_text': '', 'visibility': 'public', 'language': 'en', 'uri': 'https://truthsocial.com/@bbcnewa/109598702184774628', 'url': 'https://truthsocial.com/@bbcnewa/109598702184774628', 'content': '

Pele, regarded by many as football\'s greatest ever player, has died in Brazil at the age of 82. bbc.com/sport/football/4275151

', 'account': {'id': '107905163010312793', 'username': 'bbcnewa', 'acct': 'bbcnewa', 'display_name': 'BBC News', 'locked': False, 'bot': False, 'discoverable': True, 'group': False, 'created_at': '2022-03-05T17:42:01.159Z', 'note': '

News, features and analysis by the BBC

', 'url': 'https://truthsocial.com/@bbcnewa', 'avatar': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/avatars/107/905/163/010/312/793/original/e7c07550dc22c23a.jpeg', 'avatar_static': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/avatars/107/905/163/010/312/793/original/e7c07550dc22c23a.jpeg', 'header': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/headers/107/905/163/010/312/793/original/a00eeec2b57206c7.jpeg', 'header_static': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/headers/107/905/163/010/312/793/original/a00eeec2b57206c7.jpeg', 'followers_count': 1131, 'following_count': 3, 'statuses_count': 9, 'last_status_at': '2024-11-12', 'verified': False, 'location': '', 'website': 'https://www.bbc.com/news', 'unauth_visibility': True, 'chats_onboarded': True, 'feeds_onboarded': True, 'accepting_messages': False, 'show_nonmember_group_statuses': None, 'emojis': [], 'fields': [], 'tv_onboarded': True, 'tv_account': False}, 'media_attachments': [], 'mentions': [], 'tags': [], 'card': None, 'group': None, 'quote': None, 'in_reply_to': None, 'reblog': None, 'sponsored': False, 'replies_count': 1, 'reblogs_count': 0, 'favourites_count': 2, 'favourited': False, 'reblogged': False, 'muted': False, 'pinned': False, 'bookmarked': False, 'poll': None, 'emojis': []} + """ + + result = Metadata() + result.set_url(url) + timestamp = post['created_at'] # format is 2022-12-29T19:51:18.161Z + result.set_timestamp(parse_dt(timestamp)) + result.set('description', post['content']) + result.set('author', post['account']['username']) + + for key in ['replies_count', 'reblogs_count', 'favourites_count', ('account', 'followers_count'), ('account', 'following_count'), ('account', 'statuses_count'), ('account', 'display_name'), 'language', 'in_reply_to_account', 'replies_count']: + if isinstance(key, tuple): + store_key = " ".join(key) + else: + store_key = key + result.set(store_key, traverse_obj(post, key)) + + # add the media + for media in post.get('media_attachments', []): + filename = archiver.download_from_url(media['url']) + result.add_media(Media(filename), id=media.get('id')) + + return result \ No newline at end of file diff --git a/src/auto_archiver/archivers/generic_archiver/twitter.py b/src/auto_archiver/archivers/generic_archiver/twitter.py index 8cc323c..ce6c28d 100644 --- a/src/auto_archiver/archivers/generic_archiver/twitter.py +++ b/src/auto_archiver/archivers/generic_archiver/twitter.py @@ -8,55 +8,63 @@ from auto_archiver.core.metadata import Metadata, Media from auto_archiver.utils import UrlUtil from auto_archiver.archivers.archiver import Archiver +from .dropin import GenericDropin, InfoExtractor -def choose_variant(variants): - # choosing the highest quality possible - variant, width, height = None, 0, 0 - for var in variants: - if var.get("content_type", "") == "video/mp4": - width_height = re.search(r"\/(\d+)x(\d+)\/", var["url"]) - if width_height: - w, h = int(width_height[1]), int(width_height[2]) - if w > width or h > height: - width, height = w, h - variant = var - else: - variant = var if not variant else variant - return variant +class Twitter(GenericDropin): -def create_metadata(tweet: dict, archiver: Archiver, url: str) -> Metadata: - result = Metadata() - try: - if not tweet.get("user") or not tweet.get("created_at"): - raise ValueError(f"Error retreiving post. Are you sure it exists?") - timestamp = datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y") - except (ValueError, KeyError) as ex: - logger.warning(f"Unable to parse tweet: {str(ex)}\nRetreived tweet data: {tweet}") - return False - - result\ - .set_title(tweet.get('full_text', ''))\ - .set_content(json.dumps(tweet, ensure_ascii=False))\ - .set_timestamp(timestamp) - if not tweet.get("entities", {}).get("media"): - logger.debug('No media found, archiving tweet text only') - result.status = "twitter-ytdl" - return result - for i, tw_media in enumerate(tweet["entities"]["media"]): - media = Media(filename="") - mimetype = "" - if tw_media["type"] == "photo": - media.set("src", UrlUtil.twitter_best_quality_url(tw_media['media_url_https'])) - mimetype = "image/jpeg" - elif tw_media["type"] == "video": - variant = choose_variant(tw_media['video_info']['variants']) - media.set("src", variant['url']) - mimetype = variant['content_type'] - elif tw_media["type"] == "animated_gif": - variant = tw_media['video_info']['variants'][0] - media.set("src", variant['url']) - mimetype = variant['content_type'] - ext = mimetypes.guess_extension(mimetype) - media.filename = archiver.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}') - result.add_media(media) - return result \ No newline at end of file + + def choose_variant(self, variants): + # choosing the highest quality possible + variant, width, height = None, 0, 0 + for var in variants: + if var.get("content_type", "") == "video/mp4": + width_height = re.search(r"\/(\d+)x(\d+)\/", var["url"]) + if width_height: + w, h = int(width_height[1]), int(width_height[2]) + if w > width or h > height: + width, height = w, h + variant = var + else: + variant = var if not variant else variant + return variant + + def extract_post(self, url: str, ie_instance: InfoExtractor): + twid = ie_instance._match_valid_url(url).group('id') + return ie_instance._extract_status(twid=twid) + + def create_metadata(self, tweet: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata: + result = Metadata() + try: + if not tweet.get("user") or not tweet.get("created_at"): + raise ValueError(f"Error retreiving post. Are you sure it exists?") + timestamp = datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y") + except (ValueError, KeyError) as ex: + logger.warning(f"Unable to parse tweet: {str(ex)}\nRetreived tweet data: {tweet}") + return False + + result\ + .set_title(tweet.get('full_text', ''))\ + .set_content(json.dumps(tweet, ensure_ascii=False))\ + .set_timestamp(timestamp) + if not tweet.get("entities", {}).get("media"): + logger.debug('No media found, archiving tweet text only') + result.status = "twitter-ytdl" + return result + for i, tw_media in enumerate(tweet["entities"]["media"]): + media = Media(filename="") + mimetype = "" + if tw_media["type"] == "photo": + media.set("src", UrlUtil.twitter_best_quality_url(tw_media['media_url_https'])) + mimetype = "image/jpeg" + elif tw_media["type"] == "video": + variant = self.choose_variant(tw_media['video_info']['variants']) + media.set("src", variant['url']) + mimetype = variant['content_type'] + elif tw_media["type"] == "animated_gif": + variant = tw_media['video_info']['variants'][0] + media.set("src", variant['url']) + mimetype = variant['content_type'] + ext = mimetypes.guess_extension(mimetype) + media.filename = archiver.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}') + result.add_media(media) + return result \ No newline at end of file diff --git a/src/auto_archiver/feeders/csv_feeder.py b/src/auto_archiver/feeders/csv_feeder.py new file mode 100644 index 0000000..00bf7d7 --- /dev/null +++ b/src/auto_archiver/feeders/csv_feeder.py @@ -0,0 +1,41 @@ +from loguru import logger +import csv + +from . import Feeder +from ..core import Metadata, ArchivingContext +from ..utils import url_or_none + +class CSVFeeder(Feeder): + + @staticmethod + def configs() -> dict: + return { + "files": { + "default": None, + "help": "Path to the input file(s) to read the URLs from, comma separated. \ + Input files should be formatted with one URL per line", + "cli_set": lambda cli_val, cur_val: list(set(cli_val.split(","))) + }, + "column": { + "default": None, + "help": "Column number or name to read the URLs from, 0-indexed", + } + } + + + def __iter__(self) -> Metadata: + url_column = self.column or 0 + for file in self.files: + with open(file, "r") as f: + reader = csv.reader(f) + first_row = next(reader) + if not(url_or_none(first_row[url_column])): + # it's a header row, skip it + logger.debug(f"Skipping header row: {first_row}") + for row in reader: + url = row[0] + logger.debug(f"Processing {url}") + yield Metadata().set_url(url) + ArchivingContext.set("folder", "cli") + + logger.success(f"Processed {len(self.urls)} URL(s)") \ No newline at end of file diff --git a/src/auto_archiver/utils/__init__.py b/src/auto_archiver/utils/__init__.py index 50bddca..36ce765 100644 --- a/src/auto_archiver/utils/__init__.py +++ b/src/auto_archiver/utils/__init__.py @@ -7,4 +7,4 @@ from .url import UrlUtil from .atlos import get_atlos_config_options # handy utils from ytdlp -from yt_dlp.utils import (clean_html, traverse_obj, strip_or_none) \ No newline at end of file +from yt_dlp.utils import (clean_html, traverse_obj, strip_or_none, url_or_none) \ No newline at end of file