kopia lustrzana https://github.com/bellingcat/auto-archiver
adds tagged posts and better parsing
rodzic
bfb35a43a9
commit
f0158ffd9c
|
@ -73,9 +73,9 @@ class InstagramAPIArchiver(Archiver):
|
||||||
if type(d) == list: return [self.cleanup_dict(v) for v in d]
|
if type(d) == list: return [self.cleanup_dict(v) for v in d]
|
||||||
if type(d) != dict: return d
|
if type(d) != dict: return d
|
||||||
return {
|
return {
|
||||||
k: self.cleanup_dict(v) if type(v) in [dict, list] else v
|
k: clean_v
|
||||||
for k, v in d.items()
|
for k, v in d.items()
|
||||||
if v not in [0.0, 0, [], {}, "", None, "null"] and
|
if (clean_v := self.cleanup_dict(v)) not in [0.0, 0, [], {}, "", None, "null"] and
|
||||||
k not in ["x", "y", "width", "height"]
|
k not in ["x", "y", "width", "height"]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,9 +93,6 @@ class InstagramAPIArchiver(Archiver):
|
||||||
|
|
||||||
if self.full_profile:
|
if self.full_profile:
|
||||||
user_id = user.get("pk")
|
user_id = user.get("pk")
|
||||||
# download all posts
|
|
||||||
self.download_all_posts(result, user_id)
|
|
||||||
|
|
||||||
# download all stories
|
# download all stories
|
||||||
try:
|
try:
|
||||||
stories = self._download_stories_reusable(result, username)
|
stories = self._download_stories_reusable(result, username)
|
||||||
|
@ -104,6 +101,12 @@ class InstagramAPIArchiver(Archiver):
|
||||||
result.append("errors", f"Error downloading stories for {username}")
|
result.append("errors", f"Error downloading stories for {username}")
|
||||||
logger.error(f"Error downloading stories for {username}: {e}")
|
logger.error(f"Error downloading stories for {username}: {e}")
|
||||||
|
|
||||||
|
# download all posts
|
||||||
|
self.download_all_posts(result, user_id)
|
||||||
|
|
||||||
|
# download all tagged
|
||||||
|
self.download_all_tagged(result, user_id)
|
||||||
|
|
||||||
# download all highlights
|
# download all highlights
|
||||||
try:
|
try:
|
||||||
count_highlights = 0
|
count_highlights = 0
|
||||||
|
@ -120,6 +123,7 @@ class InstagramAPIArchiver(Archiver):
|
||||||
result.append("errors", f"Error downloading highlights for {username}")
|
result.append("errors", f"Error downloading highlights for {username}")
|
||||||
logger.error(f"Error downloading highlights for {username}: {e}")
|
logger.error(f"Error downloading highlights for {username}: {e}")
|
||||||
|
|
||||||
|
|
||||||
result.set_url(url) # reset as scrape_item modifies it
|
result.set_url(url) # reset as scrape_item modifies it
|
||||||
return result.success("insta profile")
|
return result.success("insta profile")
|
||||||
|
|
||||||
|
@ -201,6 +205,28 @@ class InstagramAPIArchiver(Archiver):
|
||||||
post_count+=1
|
post_count+=1
|
||||||
result.set("#posts", post_count)
|
result.set("#posts", post_count)
|
||||||
|
|
||||||
|
def download_all_tagged(self, result: Metadata, user_id: str):
|
||||||
|
next_page_id = ""
|
||||||
|
pbar = tqdm(desc="downloading tagged posts")
|
||||||
|
|
||||||
|
tagged_count = 0
|
||||||
|
while next_page_id != None:
|
||||||
|
resp = self.call_api(f"v2/user/tag/medias", {"user_id": user_id, "page_id": next_page_id})
|
||||||
|
posts = resp.get("response", {}).get("items", [])
|
||||||
|
if not len(posts): break
|
||||||
|
next_page_id = resp.get("next_page_id")
|
||||||
|
|
||||||
|
logger.info(f"parsing {len(posts)} tagged posts, next {next_page_id=}")
|
||||||
|
|
||||||
|
for p in posts:
|
||||||
|
try: self.scrape_item(result, p, "tagged")
|
||||||
|
except Exception as e:
|
||||||
|
result.append("errors", f"Error downloading tagged post {p.get('id')}")
|
||||||
|
logger.error(f"Error downloading tagged post, skipping {p.get('id')}: {e}")
|
||||||
|
pbar.update(1)
|
||||||
|
tagged_count+=1
|
||||||
|
result.set("#tagged", tagged_count)
|
||||||
|
|
||||||
|
|
||||||
### reusable parsing utils below
|
### reusable parsing utils below
|
||||||
|
|
||||||
|
@ -220,7 +246,7 @@ class InstagramAPIArchiver(Archiver):
|
||||||
if code := item.get("code"):
|
if code := item.get("code"):
|
||||||
result.set("url", f"https://www.instagram.com/p/{code}/")
|
result.set("url", f"https://www.instagram.com/p/{code}/")
|
||||||
|
|
||||||
resources = item.get("resources", [])
|
resources = item.get("resources", item.get("carousel_media", []))
|
||||||
item, media, media_id = self.scrape_media(item, context)
|
item, media, media_id = self.scrape_media(item, context)
|
||||||
# if resources are present take the main media from the first resource
|
# if resources are present take the main media from the first resource
|
||||||
if not media and len(resources):
|
if not media and len(resources):
|
||||||
|
@ -242,7 +268,7 @@ class InstagramAPIArchiver(Archiver):
|
||||||
def scrape_media(self, item: dict, context:str) -> tuple[dict, Media, str]:
|
def scrape_media(self, item: dict, context:str) -> tuple[dict, Media, str]:
|
||||||
# remove unnecessary info
|
# remove unnecessary info
|
||||||
if self.minimize_json_output:
|
if self.minimize_json_output:
|
||||||
for k in ["image_versions", "video_versions", "video_dash_manifest"]:
|
for k in ["image_versions", "video_versions", "video_dash_manifest", "image_versions2", "video_versions2"]:
|
||||||
if k in item: del item[k]
|
if k in item: del item[k]
|
||||||
item = self.cleanup_dict(item)
|
item = self.cleanup_dict(item)
|
||||||
|
|
||||||
|
@ -253,19 +279,24 @@ class InstagramAPIArchiver(Archiver):
|
||||||
|
|
||||||
# retrieve video info
|
# retrieve video info
|
||||||
best_id = item.get('id', item.get('pk'))
|
best_id = item.get('id', item.get('pk'))
|
||||||
taken_at = item.get("taken_at")
|
taken_at = item.get("taken_at", item.get("taken_at_ts"))
|
||||||
code = item.get("code")
|
code = item.get("code")
|
||||||
|
caption_text = item.get("caption_text")
|
||||||
|
if "carousel_media" in item: del item["carousel_media"]
|
||||||
|
|
||||||
if video_url := item.get("video_url"):
|
if video_url := item.get("video_url"):
|
||||||
filename = self.download_from_url(video_url, verbose=False)
|
filename = self.download_from_url(video_url, verbose=False)
|
||||||
video_media = Media(filename=filename)
|
video_media = Media(filename=filename)
|
||||||
if taken_at: video_media.set("date", taken_at)
|
if taken_at: video_media.set("date", taken_at)
|
||||||
if code: video_media.set("url", f"https://www.instagram.com/p/{code}")
|
if code: video_media.set("url", f"https://www.instagram.com/p/{code}")
|
||||||
|
if caption_text: video_media.set("text", caption_text)
|
||||||
video_media.set("preview", [image_media])
|
video_media.set("preview", [image_media])
|
||||||
video_media.set("data", [item])
|
video_media.set("data", [item])
|
||||||
return item, video_media, f"{context or 'video'} {best_id}"
|
return item, video_media, f"{context or 'video'} {best_id}"
|
||||||
elif image_media:
|
elif image_media:
|
||||||
if taken_at: image_media.set("date", taken_at)
|
if taken_at: image_media.set("date", taken_at)
|
||||||
if code: image_media.set("url", f"https://www.instagram.com/p/{code}")
|
if code: image_media.set("url", f"https://www.instagram.com/p/{code}")
|
||||||
|
if caption_text: image_media.set("text", caption_text)
|
||||||
image_media.set("data", [item])
|
image_media.set("data", [item])
|
||||||
return item, image_media, f"{context or 'image'} {best_id}"
|
return item, image_media, f"{context or 'image'} {best_id}"
|
||||||
|
|
||||||
|
|
Ładowanie…
Reference in New Issue