diff --git a/src/auto_archiver/modules/generic_extractor/dropin.py b/src/auto_archiver/modules/generic_extractor/dropin.py index 723c8fc..8395f09 100644 --- a/src/auto_archiver/modules/generic_extractor/dropin.py +++ b/src/auto_archiver/modules/generic_extractor/dropin.py @@ -1,3 +1,4 @@ +from typing import Type from yt_dlp.extractor.common import InfoExtractor from auto_archiver.core.metadata import Metadata from auto_archiver.core.extractor import Extractor @@ -24,6 +25,8 @@ class GenericDropin: """ + extractor: Type[Extractor] = None + def extract_post(self, url: str, ie_instance: InfoExtractor): """ This method should return the post data from the url. @@ -55,3 +58,10 @@ class GenericDropin: This method should download any additional media from the post. """ return metadata + + def is_suitable(self, url, info_extractor: InfoExtractor): + """ + Used to override the InfoExtractor's 'is_suitable' method. Dropins should override this method to return True if the url is suitable for the extractor + (based on being able to parse other URLs) + """ + return False diff --git a/src/auto_archiver/modules/generic_extractor/facebook.py b/src/auto_archiver/modules/generic_extractor/facebook.py index 36c5e60..e04a862 100644 --- a/src/auto_archiver/modules/generic_extractor/facebook.py +++ b/src/auto_archiver/modules/generic_extractor/facebook.py @@ -1,17 +1,154 @@ +import re from .dropin import GenericDropin +from auto_archiver.core.metadata import Metadata +from yt_dlp.extractor.facebook import FacebookIE + +# TODO: Remove if / when https://github.com/yt-dlp/yt-dlp/pull/12275 is merged +from yt_dlp.utils import ( + clean_html, + get_element_by_id, + traverse_obj, + get_first, + merge_dicts, + int_or_none, + parse_count, +) + + +def _extract_metadata(self, webpage, video_id): + post_data = [ + self._parse_json(j, video_id, fatal=False) + for j in re.findall(r"data-sjs>({.*?ScheduledServerJS.*?})", webpage) + ] + post = ( + traverse_obj( + post_data, + (..., "require", ..., ..., ..., "__bbox", "require", ..., ..., ..., "__bbox", "result", "data"), + expected_type=dict, + ) + or [] + ) + media = traverse_obj( + post, + ( + ..., + "attachments", + ..., + lambda k, v: (k == "media" and str(v["id"]) == video_id and v["__typename"] == "Video"), + ), + expected_type=dict, + ) + title = get_first(media, ("title", "text")) + description = get_first(media, ("creation_story", "comet_sections", "message", "story", "message", "text")) + page_title = title or self._html_search_regex( + ( + r']*class="uiHeaderTitle"[^>]*>(?P[^<]*)', + r'(?s)(?P.*?)', + self._meta_regex("og:title"), + self._meta_regex("twitter:title"), + r"(?P<content>.+?)", + ), + webpage, + "title", + default=None, + group="content", + ) + description = description or self._html_search_meta( + ["description", "og:description", "twitter:description"], webpage, "description", default=None + ) + uploader_data = ( + get_first(media, ("owner", {dict})) + or get_first( + post, ("video", "creation_story", "attachments", ..., "media", lambda k, v: k == "owner" and v["name"]) + ) + or get_first(post, (..., "video", lambda k, v: k == "owner" and v["name"])) + or get_first(post, ("node", "actors", ..., {dict})) + or get_first(post, ("event", "event_creator", {dict})) + or get_first(post, ("video", "creation_story", "short_form_video_context", "video_owner", {dict})) + or {} + ) + uploader = uploader_data.get("name") or ( + clean_html(get_element_by_id("fbPhotoPageAuthorName", webpage)) + or self._search_regex( + (r'ownerName\s*:\s*"([^"]+)"', *self._og_regexes("title")), webpage, "uploader", fatal=False + ) + ) + timestamp = int_or_none(self._search_regex(r']+data-utime=["\'](\d+)', webpage, "timestamp", default=None)) + thumbnail = self._html_search_meta(["og:image", "twitter:image"], webpage, "thumbnail", default=None) + # some webpages contain unretrievable thumbnail urls + # like https://lookaside.fbsbx.com/lookaside/crawler/media/?media_id=10155168902769113&get_thumbnail=1 + # in https://www.facebook.com/yaroslav.korpan/videos/1417995061575415/ + if thumbnail and not re.search(r"\.(?:jpg|png)", thumbnail): + thumbnail = None + info_dict = { + "description": description, + "uploader": uploader, + "uploader_id": uploader_data.get("id"), + "timestamp": timestamp, + "thumbnail": thumbnail, + "view_count": parse_count( + self._search_regex( + (r'\bviewCount\s*:\s*["\']([\d,.]+)', r'video_view_count["\']\s*:\s*(\d+)'), + webpage, + "view count", + default=None, + ) + ), + "concurrent_view_count": get_first( + post, (("video", (..., ..., "attachments", ..., "media")), "liveViewerCount", {int_or_none}) + ), + **traverse_obj( + post, + ( + lambda _, v: video_id in v["url"], + "feedback", + { + "like_count": ("likers", "count", {int}), + "comment_count": ("total_comment_count", {int}), + "repost_count": ("share_count_reduced", {parse_count}), + }, + ), + get_all=False, + ), + } + + info_json_ld = self._search_json_ld(webpage, video_id, default={}) + info_json_ld["title"] = ( + re.sub(r"\s*\|\s*Facebook$", "", title or info_json_ld.get("title") or page_title or "") + or (description or "").replace("\n", " ") + or f"Facebook video #{video_id}" + ) + return merge_dicts(info_json_ld, info_dict) class Facebook(GenericDropin): - def extract_post(self, url: str, ie_instance): - video_id = ie_instance._match_valid_url(url).group("id") - ie_instance._download_webpage(url.replace("://m.facebook.com/", "://www.facebook.com/"), video_id) - webpage = ie_instance._download_webpage(url, ie_instance._match_valid_url(url).group("id")) + def extract_post(self, url: str, ie_instance: FacebookIE): + post_id_regex = r"(?Ppfbid[A-Za-z0-9]+|\d+|t\.(\d+\/\d+))" + post_id = re.search(post_id_regex, url).group("id") + webpage = ie_instance._download_webpage(url.replace("://m.facebook.com/", "://www.facebook.com/"), post_id) - # TODO: fix once https://github.com/yt-dlp/yt-dlp/pull/12275 is merged - post_data = ie_instance._extract_metadata(webpage) + # TODO: For long posts, this _extract_metadata only seems to return the first 100 or so characters, followed by ... + + # TODO: If/when https://github.com/yt-dlp/yt-dlp/pull/12275 is merged, uncomment next line and delete the one after + # post_data = ie_instance._extract_metadata(webpage, post_id) + post_data = _extract_metadata(ie_instance, webpage, post_id) return post_data - def create_metadata(self, post: dict, ie_instance, archiver, url): - metadata = archiver.create_metadata(url) - metadata.set_title(post.get("title")).set_content(post.get("description")).set_post_data(post) - return metadata + def create_metadata(self, post: dict, ie_instance: FacebookIE, archiver, url): + result = Metadata() + result.set_content(post.get("description", "")) + result.set_title(post.get("title", "")) + result.set("author", post.get("uploader", "")) + result.set_url(url) + return result + + def is_suitable(self, url, info_extractor: FacebookIE): + regex = r"(?:https?://(?:[\w-]+\.)?(?:facebook\.com||facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd\.onion)/)" + return re.match(regex, url) + + def skip_ytdlp_download(self, url: str, is_instance: FacebookIE): + """ + Skip using the ytdlp download method for Facebook *photo* posts, they have a URL with an id of t.XXXXX/XXXXX + """ + if re.search(r"/t.\d+/\d+", url): + return True diff --git a/src/auto_archiver/modules/generic_extractor/generic_extractor.py b/src/auto_archiver/modules/generic_extractor/generic_extractor.py index 534fb71..2f44ba8 100644 --- a/src/auto_archiver/modules/generic_extractor/generic_extractor.py +++ b/src/auto_archiver/modules/generic_extractor/generic_extractor.py @@ -67,8 +67,18 @@ class GenericExtractor(Extractor): """ Returns a list of valid extractors for the given URL""" for info_extractor in yt_dlp.YoutubeDL()._ies.values(): - if info_extractor.suitable(url) and info_extractor.working(): + if not info_extractor.working(): + continue + + # check if there's a dropin and see if that declares whether it's suitable + dropin = self.dropin_for_name(info_extractor.ie_key()) + if dropin and dropin.is_suitable(url, info_extractor): yield info_extractor + continue + + if info_extractor.suitable(url): + yield info_extractor + continue def suitable(self, url: str) -> bool: """ @@ -188,9 +198,13 @@ class GenericExtractor(Extractor): result = self.download_additional_media(video_data, info_extractor, result) # keep both 'title' and 'fulltitle', but prefer 'title', falling back to 'fulltitle' if it doesn't exist - result.set_title(video_data.pop("title", video_data.pop("fulltitle", ""))) - result.set_url(url) - if "description" in video_data: + if not result.get_title(): + result.set_title(video_data.pop("title", video_data.pop("fulltitle", ""))) + + if not result.get("url"): + result.set_url(url) + + if "description" in video_data and not result.get_content(): result.set_content(video_data["description"]) # extract comments if enabled if self.comments: @@ -207,10 +221,10 @@ class GenericExtractor(Extractor): ) # then add the common metadata - if timestamp := video_data.pop("timestamp", None): + if timestamp := video_data.pop("timestamp", None) and not result.get("timestamp"): timestamp = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc).isoformat() result.set_timestamp(timestamp) - if upload_date := video_data.pop("upload_date", None): + if upload_date := video_data.pop("upload_date", None) and not result.get("upload_date"): upload_date = datetime.datetime.strptime(upload_date, "%Y%m%d").replace(tzinfo=datetime.timezone.utc) result.set("upload_date", upload_date) @@ -240,7 +254,8 @@ class GenericExtractor(Extractor): return False post_data = dropin.extract_post(url, ie_instance) - return dropin.create_metadata(post_data, ie_instance, self, url) + result = dropin.create_metadata(post_data, ie_instance, self, url) + return self.add_metadata(post_data, info_extractor, url, result) def get_metadata_for_video( self, data: dict, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL @@ -296,6 +311,7 @@ class GenericExtractor(Extractor): def _load_dropin(dropin): dropin_class = getattr(dropin, dropin_class_name)() + dropin.extractor = self return self._dropins.setdefault(dropin_name, dropin_class) try: @@ -340,7 +356,7 @@ class GenericExtractor(Extractor): dropin_submodule = self.dropin_for_name(info_extractor.ie_key()) try: - if dropin_submodule and dropin_submodule.skip_ytdlp_download(info_extractor, url): + if dropin_submodule and dropin_submodule.skip_ytdlp_download(url, info_extractor): logger.debug(f"Skipping using ytdlp to download files for {info_extractor.ie_key()}") raise SkipYtdlp() @@ -359,7 +375,7 @@ class GenericExtractor(Extractor): if not isinstance(e, SkipYtdlp): logger.debug( - f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use extractor to get post data instead' + f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use dropin to get post data instead' ) try: diff --git a/src/auto_archiver/modules/generic_extractor/tiktok.py b/src/auto_archiver/modules/generic_extractor/tiktok.py index e05d298..b25abca 100644 --- a/src/auto_archiver/modules/generic_extractor/tiktok.py +++ b/src/auto_archiver/modules/generic_extractor/tiktok.py @@ -38,6 +38,9 @@ class Tiktok(GenericDropin): api_data["video_url"] = video_url return api_data + def keys_to_clean(self, video_data: dict, info_extractor): + return ["video_url", "title", "create_time", "author", "cover", "origin_cover", "ai_dynamic_cover", "duration"] + def create_metadata(self, post: dict, ie_instance, archiver, url): # prepare result, start by downloading video result = Metadata() @@ -54,17 +57,17 @@ class Tiktok(GenericDropin): logger.error(f"failed to download video from {video_url}") return False video_media = Media(video_downloaded) - if duration := post.pop("duration", None): + if duration := post.get("duration", None): video_media.set("duration", duration) result.add_media(video_media) # add remaining metadata - result.set_title(post.pop("title", "")) + result.set_title(post.get("title", "")) - if created_at := post.pop("create_time", None): + if created_at := post.get("create_time", None): result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc)) - if author := post.pop("author", None): + if author := post.get("author", None): result.set("author", author) result.set("api_data", post) diff --git a/src/auto_archiver/modules/local_storage/__manifest__.py b/src/auto_archiver/modules/local_storage/__manifest__.py index b5307b8..3a7f481 100644 --- a/src/auto_archiver/modules/local_storage/__manifest__.py +++ b/src/auto_archiver/modules/local_storage/__manifest__.py @@ -20,7 +20,7 @@ "save_absolute": { "default": False, "type": "bool", - "help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (WARN: leaks the file structure)", + "help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (Warning: saving an absolute path will show your computer's file structure)", }, }, "description": """ diff --git a/tests/extractors/test_generic_extractor.py b/tests/extractors/test_generic_extractor.py index ec7aae4..2089007 100644 --- a/tests/extractors/test_generic_extractor.py +++ b/tests/extractors/test_generic_extractor.py @@ -40,6 +40,22 @@ class TestGenericExtractor(TestExtractorBase): path = os.path.join(dirname(dirname(__file__)), "data/") assert self.extractor.dropin_for_name("dropin", additional_paths=[path]) + @pytest.mark.parametrize( + "url, suitable_extractors", + [ + ("https://www.youtube.com/watch?v=5qap5aO4i9A", ["youtube"]), + ("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970?lang=en", ["tiktok"]), + ("https://www.instagram.com/p/CU1J9JYJ9Zz/", ["instagram"]), + ("https://www.facebook.com/nytimes/videos/10160796550110716", ["facebook"]), + ("https://www.facebook.com/BylineFest/photos/t.100057299682816/927879487315946/", ["facebook"]), + ], + ) + def test_suitable_extractors(self, url, suitable_extractors): + suitable_extractors = suitable_extractors + ["generic"] # the generic is valid for all + extractors = list(self.extractor.suitable_extractors(url)) + assert len(extractors) == len(suitable_extractors) + assert [e.ie_key().lower() for e in extractors] == suitable_extractors + @pytest.mark.parametrize( "url, is_suitable", [ @@ -55,7 +71,7 @@ class TestGenericExtractor(TestExtractorBase): ("https://google.com", True), ], ) - def test_suitable_urls(self, make_item, url, is_suitable): + def test_suitable_urls(self, url, is_suitable): """ Note: expected behaviour is to return True for all URLs, as YoutubeDLArchiver should be able to handle all URLs This behaviour may be changed in the future (e.g. if we want the youtubedl archiver to just handle URLs it has extractors for, @@ -245,3 +261,32 @@ class TestGenericExtractor(TestExtractorBase): self.assertValidResponseMetadata(post, title, timestamp) assert len(post.media) == 1 assert post.media[0].hash == image_hash + + @pytest.mark.download + def test_download_facebook_video(self, make_item): + post = self.extractor.download(make_item("https://www.facebook.com/bellingcat/videos/588371253839133")) + assert len(post.media) == 2 + assert post.media[0].filename.endswith("588371253839133.mp4") + assert post.media[0].mimetype == "video/mp4" + + assert post.media[1].filename.endswith(".jpg") + assert post.media[1].mimetype == "image/jpeg" + + assert "Bellingchat Premium is with Kolina Koltai" in post.get_title() + + @pytest.mark.download + def test_download_facebook_image(self, make_item): + post = self.extractor.download( + make_item("https://www.facebook.com/BylineFest/photos/t.100057299682816/927879487315946/") + ) + + assert len(post.media) == 1 + assert post.media[0].filename.endswith(".png") + assert "Byline Festival - BylineFest Partner" == post.get_title() + + @pytest.mark.download + def test_download_facebook_text_only(self, make_item): + url = "https://www.facebook.com/bellingcat/posts/pfbid02rzpwZxAZ8bLkAX8NvHv4DWAidFaqAUfJMbo9vWkpwxL7uMUWzWMiizXLWRSjwihVl" + post = self.extractor.download(make_item(url)) + assert "Bellingcat researcher Kolina Koltai delves deeper into Clothoff" in post.get("content") + assert post.get_title() == "Bellingcat"