Merge pull request #223 from bellingcat/facebook_extractor

Create facebook dropin - working for images + text.
pull/256/head
Patrick Robertson 2025-03-17 12:45:05 +00:00 zatwierdzone przez GitHub
commit 3d4056ef70
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
6 zmienionych plików z 236 dodań i 25 usunięć

Wyświetl plik

@ -1,3 +1,4 @@
from typing import Type
from yt_dlp.extractor.common import InfoExtractor
from auto_archiver.core.metadata import Metadata
from auto_archiver.core.extractor import Extractor
@ -24,6 +25,8 @@ class GenericDropin:
"""
extractor: Type[Extractor] = None
def extract_post(self, url: str, ie_instance: InfoExtractor):
"""
This method should return the post data from the url.
@ -55,3 +58,10 @@ class GenericDropin:
This method should download any additional media from the post.
"""
return metadata
def is_suitable(self, url, info_extractor: InfoExtractor):
"""
Used to override the InfoExtractor's 'is_suitable' method. Dropins should override this method to return True if the url is suitable for the extractor
(based on being able to parse other URLs)
"""
return False

Wyświetl plik

@ -1,17 +1,154 @@
import re
from .dropin import GenericDropin
from auto_archiver.core.metadata import Metadata
from yt_dlp.extractor.facebook import FacebookIE
# TODO: Remove if / when https://github.com/yt-dlp/yt-dlp/pull/12275 is merged
from yt_dlp.utils import (
clean_html,
get_element_by_id,
traverse_obj,
get_first,
merge_dicts,
int_or_none,
parse_count,
)
def _extract_metadata(self, webpage, video_id):
post_data = [
self._parse_json(j, video_id, fatal=False)
for j in re.findall(r"data-sjs>({.*?ScheduledServerJS.*?})</script>", webpage)
]
post = (
traverse_obj(
post_data,
(..., "require", ..., ..., ..., "__bbox", "require", ..., ..., ..., "__bbox", "result", "data"),
expected_type=dict,
)
or []
)
media = traverse_obj(
post,
(
...,
"attachments",
...,
lambda k, v: (k == "media" and str(v["id"]) == video_id and v["__typename"] == "Video"),
),
expected_type=dict,
)
title = get_first(media, ("title", "text"))
description = get_first(media, ("creation_story", "comet_sections", "message", "story", "message", "text"))
page_title = title or self._html_search_regex(
(
r'<h2\s+[^>]*class="uiHeaderTitle"[^>]*>(?P<content>[^<]*)</h2>',
r'(?s)<span class="fbPhotosPhotoCaption".*?id="fbPhotoPageCaption"><span class="hasCaption">(?P<content>.*?)</span>',
self._meta_regex("og:title"),
self._meta_regex("twitter:title"),
r"<title>(?P<content>.+?)</title>",
),
webpage,
"title",
default=None,
group="content",
)
description = description or self._html_search_meta(
["description", "og:description", "twitter:description"], webpage, "description", default=None
)
uploader_data = (
get_first(media, ("owner", {dict}))
or get_first(
post, ("video", "creation_story", "attachments", ..., "media", lambda k, v: k == "owner" and v["name"])
)
or get_first(post, (..., "video", lambda k, v: k == "owner" and v["name"]))
or get_first(post, ("node", "actors", ..., {dict}))
or get_first(post, ("event", "event_creator", {dict}))
or get_first(post, ("video", "creation_story", "short_form_video_context", "video_owner", {dict}))
or {}
)
uploader = uploader_data.get("name") or (
clean_html(get_element_by_id("fbPhotoPageAuthorName", webpage))
or self._search_regex(
(r'ownerName\s*:\s*"([^"]+)"', *self._og_regexes("title")), webpage, "uploader", fatal=False
)
)
timestamp = int_or_none(self._search_regex(r'<abbr[^>]+data-utime=["\'](\d+)', webpage, "timestamp", default=None))
thumbnail = self._html_search_meta(["og:image", "twitter:image"], webpage, "thumbnail", default=None)
# some webpages contain unretrievable thumbnail urls
# like https://lookaside.fbsbx.com/lookaside/crawler/media/?media_id=10155168902769113&get_thumbnail=1
# in https://www.facebook.com/yaroslav.korpan/videos/1417995061575415/
if thumbnail and not re.search(r"\.(?:jpg|png)", thumbnail):
thumbnail = None
info_dict = {
"description": description,
"uploader": uploader,
"uploader_id": uploader_data.get("id"),
"timestamp": timestamp,
"thumbnail": thumbnail,
"view_count": parse_count(
self._search_regex(
(r'\bviewCount\s*:\s*["\']([\d,.]+)', r'video_view_count["\']\s*:\s*(\d+)'),
webpage,
"view count",
default=None,
)
),
"concurrent_view_count": get_first(
post, (("video", (..., ..., "attachments", ..., "media")), "liveViewerCount", {int_or_none})
),
**traverse_obj(
post,
(
lambda _, v: video_id in v["url"],
"feedback",
{
"like_count": ("likers", "count", {int}),
"comment_count": ("total_comment_count", {int}),
"repost_count": ("share_count_reduced", {parse_count}),
},
),
get_all=False,
),
}
info_json_ld = self._search_json_ld(webpage, video_id, default={})
info_json_ld["title"] = (
re.sub(r"\s*\|\s*Facebook$", "", title or info_json_ld.get("title") or page_title or "")
or (description or "").replace("\n", " ")
or f"Facebook video #{video_id}"
)
return merge_dicts(info_json_ld, info_dict)
class Facebook(GenericDropin):
def extract_post(self, url: str, ie_instance):
video_id = ie_instance._match_valid_url(url).group("id")
ie_instance._download_webpage(url.replace("://m.facebook.com/", "://www.facebook.com/"), video_id)
webpage = ie_instance._download_webpage(url, ie_instance._match_valid_url(url).group("id"))
def extract_post(self, url: str, ie_instance: FacebookIE):
post_id_regex = r"(?P<id>pfbid[A-Za-z0-9]+|\d+|t\.(\d+\/\d+))"
post_id = re.search(post_id_regex, url).group("id")
webpage = ie_instance._download_webpage(url.replace("://m.facebook.com/", "://www.facebook.com/"), post_id)
# TODO: fix once https://github.com/yt-dlp/yt-dlp/pull/12275 is merged
post_data = ie_instance._extract_metadata(webpage)
# TODO: For long posts, this _extract_metadata only seems to return the first 100 or so characters, followed by ...
# TODO: If/when https://github.com/yt-dlp/yt-dlp/pull/12275 is merged, uncomment next line and delete the one after
# post_data = ie_instance._extract_metadata(webpage, post_id)
post_data = _extract_metadata(ie_instance, webpage, post_id)
return post_data
def create_metadata(self, post: dict, ie_instance, archiver, url):
metadata = archiver.create_metadata(url)
metadata.set_title(post.get("title")).set_content(post.get("description")).set_post_data(post)
return metadata
def create_metadata(self, post: dict, ie_instance: FacebookIE, archiver, url):
result = Metadata()
result.set_content(post.get("description", ""))
result.set_title(post.get("title", ""))
result.set("author", post.get("uploader", ""))
result.set_url(url)
return result
def is_suitable(self, url, info_extractor: FacebookIE):
regex = r"(?:https?://(?:[\w-]+\.)?(?:facebook\.com||facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd\.onion)/)"
return re.match(regex, url)
def skip_ytdlp_download(self, url: str, is_instance: FacebookIE):
"""
Skip using the ytdlp download method for Facebook *photo* posts, they have a URL with an id of t.XXXXX/XXXXX
"""
if re.search(r"/t.\d+/\d+", url):
return True

Wyświetl plik

@ -67,8 +67,18 @@ class GenericExtractor(Extractor):
"""
Returns a list of valid extractors for the given URL"""
for info_extractor in yt_dlp.YoutubeDL()._ies.values():
if info_extractor.suitable(url) and info_extractor.working():
if not info_extractor.working():
continue
# check if there's a dropin and see if that declares whether it's suitable
dropin = self.dropin_for_name(info_extractor.ie_key())
if dropin and dropin.is_suitable(url, info_extractor):
yield info_extractor
continue
if info_extractor.suitable(url):
yield info_extractor
continue
def suitable(self, url: str) -> bool:
"""
@ -188,9 +198,13 @@ class GenericExtractor(Extractor):
result = self.download_additional_media(video_data, info_extractor, result)
# keep both 'title' and 'fulltitle', but prefer 'title', falling back to 'fulltitle' if it doesn't exist
result.set_title(video_data.pop("title", video_data.pop("fulltitle", "")))
result.set_url(url)
if "description" in video_data:
if not result.get_title():
result.set_title(video_data.pop("title", video_data.pop("fulltitle", "")))
if not result.get("url"):
result.set_url(url)
if "description" in video_data and not result.get_content():
result.set_content(video_data["description"])
# extract comments if enabled
if self.comments:
@ -207,10 +221,10 @@ class GenericExtractor(Extractor):
)
# then add the common metadata
if timestamp := video_data.pop("timestamp", None):
if timestamp := video_data.pop("timestamp", None) and not result.get("timestamp"):
timestamp = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc).isoformat()
result.set_timestamp(timestamp)
if upload_date := video_data.pop("upload_date", None):
if upload_date := video_data.pop("upload_date", None) and not result.get("upload_date"):
upload_date = datetime.datetime.strptime(upload_date, "%Y%m%d").replace(tzinfo=datetime.timezone.utc)
result.set("upload_date", upload_date)
@ -240,7 +254,8 @@ class GenericExtractor(Extractor):
return False
post_data = dropin.extract_post(url, ie_instance)
return dropin.create_metadata(post_data, ie_instance, self, url)
result = dropin.create_metadata(post_data, ie_instance, self, url)
return self.add_metadata(post_data, info_extractor, url, result)
def get_metadata_for_video(
self, data: dict, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL
@ -296,6 +311,7 @@ class GenericExtractor(Extractor):
def _load_dropin(dropin):
dropin_class = getattr(dropin, dropin_class_name)()
dropin.extractor = self
return self._dropins.setdefault(dropin_name, dropin_class)
try:
@ -340,7 +356,7 @@ class GenericExtractor(Extractor):
dropin_submodule = self.dropin_for_name(info_extractor.ie_key())
try:
if dropin_submodule and dropin_submodule.skip_ytdlp_download(info_extractor, url):
if dropin_submodule and dropin_submodule.skip_ytdlp_download(url, info_extractor):
logger.debug(f"Skipping using ytdlp to download files for {info_extractor.ie_key()}")
raise SkipYtdlp()
@ -359,7 +375,7 @@ class GenericExtractor(Extractor):
if not isinstance(e, SkipYtdlp):
logger.debug(
f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use extractor to get post data instead'
f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use dropin to get post data instead'
)
try:

Wyświetl plik

@ -38,6 +38,9 @@ class Tiktok(GenericDropin):
api_data["video_url"] = video_url
return api_data
def keys_to_clean(self, video_data: dict, info_extractor):
return ["video_url", "title", "create_time", "author", "cover", "origin_cover", "ai_dynamic_cover", "duration"]
def create_metadata(self, post: dict, ie_instance, archiver, url):
# prepare result, start by downloading video
result = Metadata()
@ -54,17 +57,17 @@ class Tiktok(GenericDropin):
logger.error(f"failed to download video from {video_url}")
return False
video_media = Media(video_downloaded)
if duration := post.pop("duration", None):
if duration := post.get("duration", None):
video_media.set("duration", duration)
result.add_media(video_media)
# add remaining metadata
result.set_title(post.pop("title", ""))
result.set_title(post.get("title", ""))
if created_at := post.pop("create_time", None):
if created_at := post.get("create_time", None):
result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc))
if author := post.pop("author", None):
if author := post.get("author", None):
result.set("author", author)
result.set("api_data", post)

Wyświetl plik

@ -20,7 +20,7 @@
"save_absolute": {
"default": False,
"type": "bool",
"help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (WARN: leaks the file structure)",
"help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (Warning: saving an absolute path will show your computer's file structure)",
},
},
"description": """

Wyświetl plik

@ -40,6 +40,22 @@ class TestGenericExtractor(TestExtractorBase):
path = os.path.join(dirname(dirname(__file__)), "data/")
assert self.extractor.dropin_for_name("dropin", additional_paths=[path])
@pytest.mark.parametrize(
"url, suitable_extractors",
[
("https://www.youtube.com/watch?v=5qap5aO4i9A", ["youtube"]),
("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970?lang=en", ["tiktok"]),
("https://www.instagram.com/p/CU1J9JYJ9Zz/", ["instagram"]),
("https://www.facebook.com/nytimes/videos/10160796550110716", ["facebook"]),
("https://www.facebook.com/BylineFest/photos/t.100057299682816/927879487315946/", ["facebook"]),
],
)
def test_suitable_extractors(self, url, suitable_extractors):
suitable_extractors = suitable_extractors + ["generic"] # the generic is valid for all
extractors = list(self.extractor.suitable_extractors(url))
assert len(extractors) == len(suitable_extractors)
assert [e.ie_key().lower() for e in extractors] == suitable_extractors
@pytest.mark.parametrize(
"url, is_suitable",
[
@ -55,7 +71,7 @@ class TestGenericExtractor(TestExtractorBase):
("https://google.com", True),
],
)
def test_suitable_urls(self, make_item, url, is_suitable):
def test_suitable_urls(self, url, is_suitable):
"""
Note: expected behaviour is to return True for all URLs, as YoutubeDLArchiver should be able to handle all URLs
This behaviour may be changed in the future (e.g. if we want the youtubedl archiver to just handle URLs it has extractors for,
@ -245,3 +261,32 @@ class TestGenericExtractor(TestExtractorBase):
self.assertValidResponseMetadata(post, title, timestamp)
assert len(post.media) == 1
assert post.media[0].hash == image_hash
@pytest.mark.download
def test_download_facebook_video(self, make_item):
post = self.extractor.download(make_item("https://www.facebook.com/bellingcat/videos/588371253839133"))
assert len(post.media) == 2
assert post.media[0].filename.endswith("588371253839133.mp4")
assert post.media[0].mimetype == "video/mp4"
assert post.media[1].filename.endswith(".jpg")
assert post.media[1].mimetype == "image/jpeg"
assert "Bellingchat Premium is with Kolina Koltai" in post.get_title()
@pytest.mark.download
def test_download_facebook_image(self, make_item):
post = self.extractor.download(
make_item("https://www.facebook.com/BylineFest/photos/t.100057299682816/927879487315946/")
)
assert len(post.media) == 1
assert post.media[0].filename.endswith(".png")
assert "Byline Festival - BylineFest Partner" == post.get_title()
@pytest.mark.download
def test_download_facebook_text_only(self, make_item):
url = "https://www.facebook.com/bellingcat/posts/pfbid02rzpwZxAZ8bLkAX8NvHv4DWAidFaqAUfJMbo9vWkpwxL7uMUWzWMiizXLWRSjwihVl"
post = self.extractor.download(make_item(url))
assert "Bellingcat researcher Kolina Koltai delves deeper into Clothoff" in post.get("content")
assert post.get_title() == "Bellingcat"