From 1def8bb03dde9a2609f79ae91e8a94d33692256a Mon Sep 17 00:00:00 2001 From: msramalho <19508417+msramalho@users.noreply.github.com> Date: Wed, 18 Jan 2023 16:16:23 +0000 Subject: [PATCH] instagram archiver --- src/archivers/__init__.py | 5 +- src/archivers/instagram_archiverv2.py | 144 ++++++++++++++++++++++++ src/archivers/telethon_archiverv2.py | 4 +- src/archivers/twitter_api_archiverv2.py | 5 + src/steps/step.py | 8 ++ 5 files changed, 162 insertions(+), 4 deletions(-) create mode 100644 src/archivers/instagram_archiverv2.py diff --git a/src/archivers/__init__.py b/src/archivers/__init__.py index f25668d..d256f8c 100644 --- a/src/archivers/__init__.py +++ b/src/archivers/__init__.py @@ -9,8 +9,9 @@ from .youtubedl_archiver import YoutubeDLArchiver # from .twitter_archiver import TwitterArchiver from .vk_archiver import VkArchiver # from .twitter_api_archiver import TwitterApiArchiver -from .instagram_archiver import InstagramArchiver +# from .instagram_archiver import InstagramArchiver from .telethon_archiverv2 import TelethonArchiver from .twitter_archiverv2 import TwitterArchiver -from .twitter_api_archiverv2 import TwitterApiArchiver \ No newline at end of file +from .twitter_api_archiverv2 import TwitterApiArchiver +from .instagram_archiverv2 import InstagramArchiver \ No newline at end of file diff --git a/src/archivers/instagram_archiverv2.py b/src/archivers/instagram_archiverv2.py new file mode 100644 index 0000000..2ca2e80 --- /dev/null +++ b/src/archivers/instagram_archiverv2.py @@ -0,0 +1,144 @@ +import re, os, shutil, html, traceback +import instaloader # https://instaloader.github.io/as-module.html +from loguru import logger + +from metadata import Metadata +from media import Media +from .archiver import Archiverv2 + + +class InstagramArchiver(Archiverv2): + """ + Uses Instaloader to download either a post (inc images, videos, text) or as much as possible from a profile (posts, stories, highlights, ...) + """ + name = "instagram_archiver" + + # NB: post regex should be tested before profile + # https://regex101.com/r/MGPquX/1 + post_pattern = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/(?:p|reel)\/(\w+)") + # https://regex101.com/r/6Wbsxa/1 + profile_pattern = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/(\w+)") + # TODO: links to stories + + def __init__(self, config: dict) -> None: + super().__init__(config) + # TODO: refactor how configuration validation is done + self.assert_valid_string("username") + self.assert_valid_string("password") + self.assert_valid_string("download_folder") + self.assert_valid_string("session_file") + self.insta = instaloader.Instaloader( + download_geotags=True, download_comments=True, compress_json=False, dirname_pattern=self.download_folder, filename_pattern="{date_utc}_UTC_{target}__{typename}" + ) + try: + self.insta.load_session_from_file(self.username, self.session_file) + except Exception as e: + logger.error(f"Unable to login from session file: {e}\n{traceback.format_exc()}") + try: + self.insta.login(self.username, config.instagram_self.password) + # TODO: wait for this issue to be fixed https://github.com/instaloader/instaloader/issues/1758 + self.insta.save_session_to_file(self.session_file) + except Exception as e2: + logger.error(f"Unable to finish login (retrying from file): {e2}\n{traceback.format_exc()}") + + @staticmethod + def configs() -> dict: + return { + "username": {"default": None, "help": "a valid Instagram username"}, + "password": {"default": None, "help": "the corresponding Instagram account password"}, + "download_folder": {"default": "instaloader", "help": "name of a folder to temporarily download content to"}, + "session_file": {"default": "secrets/instaloader.session", "help": "path to the instagram session which saves session credentials"}, + #TODO: fine-grain + # "download_stories": {"default": True, "help": "if the link is to a user profile: whether to get stories information"}, + } + + def download(self, item: Metadata) -> Metadata: + url = item.get_url() + + # detect URLs that we definitely cannot handle + post_matches = self.post_pattern.findall(url) + profile_matches = self.profile_pattern.findall(url) + + # return if not a valid instagram link + if not len(post_matches) and not len(profile_matches): return + + result = None + try: + os.makedirs(self.download_folder, exist_ok=True) + # process if post + if len(post_matches): + result = self.download_post(url, post_matches[0]) + # process if profile + elif len(profile_matches): + result = self.download_profile(url, profile_matches[0]) + except Exception as e: + logger.error(f"Failed to download with instagram archiver due to: {e}, make sure your account credentials are valid.") + finally: + shutil.rmtree(self.download_folder, ignore_errors=True) + return result + + def download_post(self, url: str, post_id: str) -> Metadata: + logger.debug(f"Instagram {post_id=} detected in {url=}") + + post = instaloader.Post.from_shortcode(self.insta.context, post_id) + if self.insta.download_post(post, target=post.owner_username): + return self.process_downloads(url, post.title, post._asdict(), post.date) + + def download_profile(self, url: str, username: str) -> Metadata: + # gets posts, posts where username is tagged, igtv postss, stories, and highlights + logger.debug(f"Instagram {username=} detected in {url=}") + + profile = instaloader.Profile.from_username(self.insta.context, username) + try: + for post in profile.get_posts(): + try: self.insta.download_post(post, target=f"profile_post_{post.owner_username}") + except Exception as e: logger.error(f"Failed to download post: {post.shortcode}: {e}") + except Exception as e: logger.error(f"Failed profile.get_posts: {e}") + + try: + for post in profile.get_tagged_posts(): + try: self.insta.download_post(post, target=f"tagged_post_{post.owner_username}") + except Exception as e: logger.error(f"Failed to download tagged post: {post.shortcode}: {e}") + except Exception as e: logger.error(f"Failed profile.get_tagged_posts: {e}") + + try: + for post in profile.get_igtv_posts(): + try: self.insta.download_post(post, target=f"igtv_post_{post.owner_username}") + except Exception as e: logger.error(f"Failed to download igtv post: {post.shortcode}: {e}") + except Exception as e: logger.error(f"Failed profile.get_igtv_posts: {e}") + + try: + for story in self.insta.get_stories([profile.userid]): + for item in story.get_items(): + try: self.insta.download_storyitem(item, target=f"story_item_{story.owner_username}") + except Exception as e: logger.error(f"Failed to download story item: {item}: {e}") + except Exception as e: logger.error(f"Failed get_stories: {e}") + + try: + for highlight in self.insta.get_highlights(profile.userid): + for item in highlight.get_items(): + try: self.insta.download_storyitem(item, target=f"highlight_item_{highlight.owner_username}") + except Exception as e: logger.error(f"Failed to download highlight item: {item}: {e}") + except Exception as e: logger.error(f"Failed get_highlights: {e}") + + return self.process_downloads(url, f"@{username}", profile._asdict(), None) + + def process_downloads(self, url, title, content, date): + result = Metadata() + result.set_title(title).set_content(str(content)).set_timestamp(date) + + try: + all_media = [] + for f in os.listdir(self.download_folder): + if os.path.isfile((filename := os.path.join(self.download_folder, f))): + if filename[-4:] == ".txt": continue + all_media.append(Media(filename)) + + assert len(all_media) > 1, "No uploaded media found" + all_media.sort(key=lambda m: m.filename, reverse=True) + for m in all_media: + result.add_media(m) + + return result.success("instagram") + except Exception as e: + logger.error(f"Could not fetch instagram post {url} due to: {e}") diff --git a/src/archivers/telethon_archiverv2.py b/src/archivers/telethon_archiverv2.py index 90de5da..094b004 100644 --- a/src/archivers/telethon_archiverv2.py +++ b/src/archivers/telethon_archiverv2.py @@ -19,8 +19,8 @@ class TelethonArchiver(Archiverv2): def __init__(self, config: dict) -> None: super().__init__(config) - assert self.api_id is not None and type(self.api_id) == str and len(self.api_id) > 0, f"invalid telethon api_id value ({self.api_id}) should be a valid string" - assert self.api_hash is not None and type(self.api_hash) == str and len(self.api_hash) > 0, f"invalid telethon api_hash value ({self.api_hash}) should be a valid string" + self.assert_valid_string("api_id") + self.assert_valid_string("api_hash") self.client = TelegramClient(self.session_file, self.api_id, self.api_hash) diff --git a/src/archivers/twitter_api_archiverv2.py b/src/archivers/twitter_api_archiverv2.py index c95795a..5cfbc0d 100644 --- a/src/archivers/twitter_api_archiverv2.py +++ b/src/archivers/twitter_api_archiverv2.py @@ -20,8 +20,13 @@ class TwitterApiArchiver(TwitterArchiver, Archiverv2): super().__init__(config) if self.bearer_token: + self.assert_valid_string("bearer_token") self.api = Api(bearer_token=self.bearer_token) elif self.consumer_key and self.consumer_secret and self.access_token and self.access_secret: + self.assert_valid_string("consumer_key") + self.assert_valid_string("consumer_secret") + self.assert_valid_string("access_token") + self.assert_valid_string("access_secret") self.api = Api( consumer_key=self.consumer_key, consumer_secret=self.consumer_secret, access_token=self.access_token, access_secret=self.access_secret) assert hasattr(self, "api") and self.api is not None, "Missing Twitter API configurations, please provide either bearer_token OR (consumer_key, consumer_secret, access_token, access_secret) to use this archiver." diff --git a/src/steps/step.py b/src/steps/step.py index a8bad38..e80437b 100644 --- a/src/steps/step.py +++ b/src/steps/step.py @@ -29,3 +29,11 @@ class Step(ABC): if sub.name == name: return sub(config) raise ClassFoundException(f"Unable to initialize STEP with {name=}, check your configuration file/step names.") + + def assert_valid_string(self, prop: str) -> None: + """ + receives a property name an ensures it exists and is a valid non-empty string, raises an exception if not + """ + assert hasattr(self, prop), f"property {prop} not found" + s = getattr(self, prop) + assert s is not None and type(s) == str and len(s) > 0, f"invalid property {prop} value '{s}', it should be a valid string"