kopia lustrzana https://github.com/bellingcat/auto-archiver
Porównaj commity
6 Commity
ef5b39c4f1
...
ccf5f857ef
Autor | SHA1 | Data |
---|---|---|
msramalho | ccf5f857ef | |
msramalho | 7de317d1b5 | |
msramalho | 70075a1e5e | |
msramalho | 5b9bc4919a | |
msramalho | f0158ffd9c | |
msramalho | bfb35a43a9 |
|
@ -22,6 +22,7 @@ class InstagramAPIArchiver(Archiver):
|
|||
super().__init__(config)
|
||||
self.assert_valid_string("access_token")
|
||||
self.assert_valid_string("api_endpoint")
|
||||
self.full_profile_max_posts = int(self.full_profile_max_posts)
|
||||
if self.api_endpoint[-1] == "/": self.api_endpoint = self.api_endpoint[:-1]
|
||||
|
||||
self.full_profile = bool(self.full_profile)
|
||||
|
@ -33,6 +34,7 @@ class InstagramAPIArchiver(Archiver):
|
|||
"access_token": {"default": None, "help": "a valid instagrapi-api token"},
|
||||
"api_endpoint": {"default": None, "help": "API endpoint to use"},
|
||||
"full_profile": {"default": False, "help": "if true, will download all posts, tagged posts, stories, and highlights for a profile, if false, will only download the profile pic and information."},
|
||||
"full_profile_max_posts": {"default": 0, "help": "Use to limit the number of posts to download when full_profile is true. 0 means no limit. limit is applied softly since posts are fetched in batch, once to: posts, tagged posts, and highlights"},
|
||||
"minimize_json_output": {"default": True, "help": "if true, will remove empty values from the json output"},
|
||||
}
|
||||
|
||||
|
@ -73,9 +75,9 @@ class InstagramAPIArchiver(Archiver):
|
|||
if type(d) == list: return [self.cleanup_dict(v) for v in d]
|
||||
if type(d) != dict: return d
|
||||
return {
|
||||
k: self.cleanup_dict(v) if type(v) in [dict, list] else v
|
||||
k: clean_v
|
||||
for k, v in d.items()
|
||||
if v not in [0.0, 0, [], {}, "", None, "null"] and
|
||||
if (clean_v := self.cleanup_dict(v)) not in [0.0, 0, [], {}, "", None, "null"] and
|
||||
k not in ["x", "y", "width", "height"]
|
||||
}
|
||||
|
||||
|
@ -93,9 +95,6 @@ class InstagramAPIArchiver(Archiver):
|
|||
|
||||
if self.full_profile:
|
||||
user_id = user.get("pk")
|
||||
# download all posts
|
||||
self.download_all_posts(result, user_id)
|
||||
|
||||
# download all stories
|
||||
try:
|
||||
stories = self._download_stories_reusable(result, username)
|
||||
|
@ -104,25 +103,46 @@ class InstagramAPIArchiver(Archiver):
|
|||
result.append("errors", f"Error downloading stories for {username}")
|
||||
logger.error(f"Error downloading stories for {username}: {e}")
|
||||
|
||||
# download all posts
|
||||
try:
|
||||
self.download_all_posts(result, user_id)
|
||||
except Exception as e:
|
||||
result.append("errors", f"Error downloading posts for {username}")
|
||||
logger.error(f"Error downloading posts for {username}: {e}")
|
||||
|
||||
# download all tagged
|
||||
try:
|
||||
self.download_all_tagged(result, user_id)
|
||||
except Exception as e:
|
||||
result.append("errors", f"Error downloading tagged posts for {username}")
|
||||
logger.error(f"Error downloading tagged posts for {username}: {e}")
|
||||
|
||||
# download all highlights
|
||||
try:
|
||||
count_highlights = 0
|
||||
highlights = self.call_api(f"v1/user/highlights", {"user_id": user_id})
|
||||
for h in highlights:
|
||||
try:
|
||||
h_info = self._download_highlights_reusable(result, h.get("pk"))
|
||||
count_highlights += len(h_info.get("items", []))
|
||||
except Exception as e:
|
||||
result.append("errors", f"Error downloading highlight id{h.get('pk')} for {username}")
|
||||
logger.error(f"Error downloading highlight id{h.get('pk')} for {username}: {e}")
|
||||
result.set("#highlights", count_highlights)
|
||||
self.download_all_highlights(result, username, user_id)
|
||||
except Exception as e:
|
||||
result.append("errors", f"Error downloading highlights for {username}")
|
||||
logger.error(f"Error downloading highlights for {username}: {e}")
|
||||
|
||||
|
||||
result.set_url(url) # reset as scrape_item modifies it
|
||||
return result.success("insta profile")
|
||||
|
||||
def download_all_highlights(self, result, username, user_id):
|
||||
count_highlights = 0
|
||||
highlights = self.call_api(f"v1/user/highlights", {"user_id": user_id})
|
||||
for h in highlights:
|
||||
try:
|
||||
h_info = self._download_highlights_reusable(result, h.get("pk"))
|
||||
count_highlights += len(h_info.get("items", []))
|
||||
except Exception as e:
|
||||
result.append("errors", f"Error downloading highlight id{h.get('pk')} for {username}")
|
||||
logger.error(f"Error downloading highlight id{h.get('pk')} for {username}: {e}")
|
||||
if self.full_profile_max_posts and count_highlights >= self.full_profile_max_posts:
|
||||
logger.info(f"HIGHLIGHTS reached full_profile_max_posts={self.full_profile_max_posts}")
|
||||
break
|
||||
result.set("#highlights", count_highlights)
|
||||
|
||||
def download_post(self, result: Metadata, code: str = None, id: str = None, context: str = None) -> Metadata:
|
||||
if id:
|
||||
post = self.call_api(f"v1/media/by/id", {"id": id})
|
||||
|
@ -188,7 +208,7 @@ class InstagramAPIArchiver(Archiver):
|
|||
post_count = 0
|
||||
while end_cursor != "":
|
||||
posts = self.call_api(f"v1/user/medias/chunk", {"user_id": user_id, "end_cursor": end_cursor})
|
||||
if not len(posts): break
|
||||
if not len(posts) or not type(posts) == list or len(posts) != 2: break
|
||||
posts, end_cursor = posts[0], posts[1]
|
||||
logger.info(f"parsing {len(posts)} posts, next {end_cursor=}")
|
||||
|
||||
|
@ -199,7 +219,35 @@ class InstagramAPIArchiver(Archiver):
|
|||
logger.error(f"Error downloading post, skipping {p.get('id')}: {e}")
|
||||
pbar.update(1)
|
||||
post_count+=1
|
||||
if self.full_profile_max_posts and post_count >= self.full_profile_max_posts:
|
||||
logger.info(f"POSTS reached full_profile_max_posts={self.full_profile_max_posts}")
|
||||
break
|
||||
result.set("#posts", post_count)
|
||||
|
||||
def download_all_tagged(self, result: Metadata, user_id: str):
|
||||
next_page_id = ""
|
||||
pbar = tqdm(desc="downloading tagged posts")
|
||||
|
||||
tagged_count = 0
|
||||
while next_page_id != None:
|
||||
resp = self.call_api(f"v2/user/tag/medias", {"user_id": user_id, "page_id": next_page_id})
|
||||
posts = resp.get("response", {}).get("items", [])
|
||||
if not len(posts): break
|
||||
next_page_id = resp.get("next_page_id")
|
||||
|
||||
logger.info(f"parsing {len(posts)} tagged posts, next {next_page_id=}")
|
||||
|
||||
for p in posts:
|
||||
try: self.scrape_item(result, p, "tagged")
|
||||
except Exception as e:
|
||||
result.append("errors", f"Error downloading tagged post {p.get('id')}")
|
||||
logger.error(f"Error downloading tagged post, skipping {p.get('id')}: {e}")
|
||||
pbar.update(1)
|
||||
tagged_count+=1
|
||||
if self.full_profile_max_posts and tagged_count >= self.full_profile_max_posts:
|
||||
logger.info(f"TAGS reached full_profile_max_posts={self.full_profile_max_posts}")
|
||||
break
|
||||
result.set("#tagged", tagged_count)
|
||||
|
||||
|
||||
### reusable parsing utils below
|
||||
|
@ -217,10 +265,10 @@ class InstagramAPIArchiver(Archiver):
|
|||
if self.minimize_json_output:
|
||||
del item["clips_metadata"]
|
||||
|
||||
if code := item.get("code"):
|
||||
result.set("url", f"https://www.instagram.com/p/{code}/")
|
||||
if code := item.get("code") and not result.get("url"):
|
||||
result.set_url(f"https://www.instagram.com/p/{code}/")
|
||||
|
||||
resources = item.get("resources", [])
|
||||
resources = item.get("resources", item.get("carousel_media", []))
|
||||
item, media, media_id = self.scrape_media(item, context)
|
||||
# if resources are present take the main media from the first resource
|
||||
if not media and len(resources):
|
||||
|
@ -242,7 +290,7 @@ class InstagramAPIArchiver(Archiver):
|
|||
def scrape_media(self, item: dict, context:str) -> tuple[dict, Media, str]:
|
||||
# remove unnecessary info
|
||||
if self.minimize_json_output:
|
||||
for k in ["image_versions", "video_versions", "video_dash_manifest"]:
|
||||
for k in ["image_versions", "video_versions", "video_dash_manifest", "image_versions2", "video_versions2"]:
|
||||
if k in item: del item[k]
|
||||
item = self.cleanup_dict(item)
|
||||
|
||||
|
@ -253,19 +301,24 @@ class InstagramAPIArchiver(Archiver):
|
|||
|
||||
# retrieve video info
|
||||
best_id = item.get('id', item.get('pk'))
|
||||
taken_at = item.get("taken_at")
|
||||
taken_at = item.get("taken_at", item.get("taken_at_ts"))
|
||||
code = item.get("code")
|
||||
caption_text = item.get("caption_text")
|
||||
if "carousel_media" in item: del item["carousel_media"]
|
||||
|
||||
if video_url := item.get("video_url"):
|
||||
filename = self.download_from_url(video_url, verbose=False)
|
||||
video_media = Media(filename=filename)
|
||||
if taken_at: video_media.set("date", taken_at)
|
||||
if code: video_media.set("url", f"https://www.instagram.com/p/{code}")
|
||||
if caption_text: video_media.set("text", caption_text)
|
||||
video_media.set("preview", [image_media])
|
||||
video_media.set("data", [item])
|
||||
return item, video_media, f"{context or 'video'} {best_id}"
|
||||
elif image_media:
|
||||
if taken_at: image_media.set("date", taken_at)
|
||||
if code: image_media.set("url", f"https://www.instagram.com/p/{code}")
|
||||
if caption_text: image_media.set("text", caption_text)
|
||||
image_media.set("data", [item])
|
||||
return item, image_media, f"{context or 'image'} {best_id}"
|
||||
|
||||
|
|
|
@ -15,6 +15,8 @@ class YoutubeDLArchiver(Archiver):
|
|||
self.livestreams = bool(self.livestreams)
|
||||
self.live_from_start = bool(self.live_from_start)
|
||||
self.end_means_success = bool(self.end_means_success)
|
||||
self.allow_playlist = bool(self.allow_playlist)
|
||||
self.max_downloads = self.max_downloads
|
||||
|
||||
@staticmethod
|
||||
def configs() -> dict:
|
||||
|
@ -26,6 +28,8 @@ class YoutubeDLArchiver(Archiver):
|
|||
"live_from_start": {"default": False, "help": "if set, will download live streams from their earliest available moment, otherwise starts now."},
|
||||
"proxy": {"default": "", "help": "http/socks (https seems to not work atm) proxy to use for the webdriver, eg https://proxy-user:password@proxy-ip:port"},
|
||||
"end_means_success": {"default": True, "help": "if True, any archived content will mean a 'success', if False this archiver will not return a 'success' stage; this is useful for cases when the yt-dlp will archive a video but ignore other types of content like images or text only pages that the subsequent archivers can retrieve."},
|
||||
'allow_playlist': {"default": False, "help": "If True will also download playlists, set to False if the expectation is to download a single video."},
|
||||
"max_downloads": {"default": "inf", "help": "Use to limit the number of videos to download when a channel or long page is being extracted. 'inf' means no limit."},
|
||||
}
|
||||
|
||||
def download(self, item: Metadata) -> Metadata:
|
||||
|
@ -35,11 +39,11 @@ class YoutubeDLArchiver(Archiver):
|
|||
logger.debug('Using Facebook cookie')
|
||||
yt_dlp.utils.std_headers['cookie'] = self.facebook_cookie
|
||||
|
||||
ydl_options = {'outtmpl': os.path.join(ArchivingContext.get_tmp_dir(), f'%(id)s.%(ext)s'), 'quiet': False, 'noplaylist': True, 'writesubtitles': self.subtitles, 'writeautomaticsub': self.subtitles, "live_from_start": self.live_from_start, "proxy": self.proxy}
|
||||
ydl_options = {'outtmpl': os.path.join(ArchivingContext.get_tmp_dir(), f'%(id)s.%(ext)s'), 'quiet': False, 'noplaylist': not self.allow_playlist , 'writesubtitles': self.subtitles, 'writeautomaticsub': self.subtitles, "live_from_start": self.live_from_start, "proxy": self.proxy, "max_downloads": self.max_downloads, "playlistend": self.max_downloads}
|
||||
ydl = yt_dlp.YoutubeDL(ydl_options) # allsubtitles and subtitleslangs not working as expected, so default lang is always "en"
|
||||
|
||||
try:
|
||||
# don'd download since it can be a live stream
|
||||
# don't download since it can be a live stream
|
||||
info = ydl.extract_info(url, download=False)
|
||||
if info.get('is_live', False) and not self.livestreams:
|
||||
logger.warning("Livestream detected, skipping due to 'livestreams' configuration setting")
|
||||
|
@ -64,13 +68,17 @@ class YoutubeDLArchiver(Archiver):
|
|||
|
||||
result = Metadata()
|
||||
result.set_title(info.get("title"))
|
||||
if "description" in info: result.set_content(info["description"])
|
||||
for entry in entries:
|
||||
try:
|
||||
filename = ydl.prepare_filename(entry)
|
||||
if not os.path.exists(filename):
|
||||
filename = filename.split('.')[0] + '.mkv'
|
||||
new_media = Media(filename).set("duration", info.get("duration"))
|
||||
|
||||
|
||||
new_media = Media(filename)
|
||||
for x in ["duration", "original_url", "fulltitle", "description", "upload_date"]:
|
||||
if x in entry: new_media.set(x, entry[x])
|
||||
|
||||
# read text from subtitles if enabled
|
||||
if self.subtitles:
|
||||
for lang, val in (info.get('requested_subtitles') or {}).items():
|
||||
|
|
|
@ -3,7 +3,7 @@ _MAJOR = "0"
|
|||
_MINOR = "9"
|
||||
# On main and in a nightly release the patch should be one ahead of the last
|
||||
# released build.
|
||||
_PATCH = "7"
|
||||
_PATCH = "11"
|
||||
# This is mainly for nightly builds which have the suffix ".dev$DATE". See
|
||||
# https://semver.org/#is-v123-a-semantic-version for the semantics.
|
||||
_SUFFIX = ""
|
||||
|
|
Ładowanie…
Reference in New Issue