diff --git a/src/auto_archiver/modules/telethon_extractor/telethon_extractor.py b/src/auto_archiver/modules/telethon_extractor/telethon_extractor.py index 279dccd..2643b32 100644 --- a/src/auto_archiver/modules/telethon_extractor/telethon_extractor.py +++ b/src/auto_archiver/modules/telethon_extractor/telethon_extractor.py @@ -5,6 +5,7 @@ import time from pathlib import Path from datetime import date +from telethon import functions from telethon.sync import TelegramClient from telethon.errors import ChannelInvalidError from telethon.tl.functions.messages import ImportChatInviteRequest @@ -24,7 +25,7 @@ from auto_archiver.utils import random_str class TelethonExtractor(Extractor): - valid_url = re.compile(r"https:\/\/t\.me(\/c){0,1}\/(.+)\/(\d+)") + valid_url = re.compile(r"https:\/\/t\.me(\/c){0,1}\/(.+?)(\/s){0,1}\/(\d+)") invite_pattern = re.compile(r"t.me(\/joinchat){0,1}\/\+?(.+)") def setup(self) -> None: @@ -122,62 +123,85 @@ class TelethonExtractor(Extractor): is_private = match.group(1) == "/c" chat = int(match.group(2)) if is_private else match.group(2) - post_id = int(match.group(3)) + is_story = match.group(3) == "/s" + post_id = int(match.group(4)) result = Metadata() # NB: not using bot_token since then private channels cannot be archived: self.client.start(bot_token=self.bot_token) with self.client.start(): # with self.client.start(bot_token=self.bot_token): - try: - post = self.client.get_messages(chat, ids=post_id) - except ValueError as e: - logger.error(f"Could not fetch telegram {url} possibly it's private: {e}") - return False - except ChannelInvalidError as e: - logger.error( - f"Could not fetch telegram {url}. This error may be fixed if you setup a bot_token in addition to api_id and api_hash (but then private channels will not be archived, we need to update this logic to handle both): {e}" - ) - return False + if is_story: + try: + stories = self.client(functions.stories.GetStoriesByIDRequest(peer=chat, id=[post_id])) + if not stories.stories: + logger.info(f"No stories found for {url}, possibly it's private or the story has expired.") + return False + story = stories.stories[0] + logger.debug(f"TELETHON got story {story.id=} {story.date=} {story.expire_date=}") + result.set_timestamp(story.date).set("views", story.views.to_dict()).set( + "expire_date", story.expire_date + ) - logger.debug(f"TELETHON GOT POST {post=}") - if post is None: - return False + # download the story media + filename_dest = os.path.join(self.tmp_dir, f"{chat}_{post_id}", str(story.id)) + if filename := self.client.download_media(story.media, filename_dest): + result.add_media(Media(filename)) + except Exception as e: + logger.error(f"Error fetching story {post_id} from {chat}: {e}") + return False + else: + try: + post = self.client.get_messages(chat, ids=post_id) + except ValueError as e: + logger.error(f"Could not fetch telegram {url} possibly it's private: {e}") + return False + except ChannelInvalidError as e: + logger.error( + f"Could not fetch telegram {url}. This error may be fixed if you setup a bot_token in addition to api_id and api_hash (but then private channels will not be archived, we need to update this logic to handle both): {e}" + ) + return False - media_posts = self._get_media_posts_in_group(chat, post) - logger.debug(f"got {len(media_posts)=} for {url=}") + logger.debug(f"TELETHON got post {post=}") + if post is None: + return False - tmp_dir = self.tmp_dir + media_posts = self._get_media_posts_in_group(chat, post) + logger.debug(f"got {len(media_posts)=} for {url=}") - group_id = post.grouped_id if post.grouped_id is not None else post.id - title = post.message - for mp in media_posts: - if len(mp.message) > len(title): - title = mp.message # save the longest text found (usually only 1) + group_id = post.grouped_id if post.grouped_id is not None else post.id + title = post.message + for mp in media_posts: + if len(mp.message) > len(title): + title = mp.message # save the longest text found (usually only 1) - # media can also be in entities - if mp.entities: - other_media_urls = [ - e.url - for e in mp.entities - if hasattr(e, "url") and e.url and self._guess_file_type(e.url) in ["video", "image", "audio"] - ] - if len(other_media_urls): - logger.debug(f"Got {len(other_media_urls)} other media urls from {mp.id=}: {other_media_urls}") - for i, om_url in enumerate(other_media_urls): - filename = self.download_from_url(om_url, f"{chat}_{group_id}_{i}") - result.add_media(Media(filename=filename), id=f"{group_id}_{i}") + # media can also be in entities + if mp.entities: + other_media_urls = [ + e.url + for e in mp.entities + if hasattr(e, "url") + and e.url + and self._guess_file_type(e.url) in ["video", "image", "audio"] + ] + if len(other_media_urls): + logger.debug( + f"Got {len(other_media_urls)} other media urls from {mp.id=}: {other_media_urls}" + ) + for i, om_url in enumerate(other_media_urls): + filename = self.download_from_url(om_url, f"{chat}_{group_id}_{i}") + result.add_media(Media(filename=filename), id=f"{group_id}_{i}") - filename_dest = os.path.join(tmp_dir, f"{chat}_{group_id}", str(mp.id)) - filename = self.client.download_media(mp.media, filename_dest) - if not filename: - logger.debug(f"Empty media found, skipping {str(mp)=}") - continue - result.add_media(Media(filename)) + filename_dest = os.path.join(self.tmp_dir, f"{chat}_{group_id}", str(mp.id)) + filename = self.client.download_media(mp.media, filename_dest) + if not filename: + logger.debug(f"Empty media found, skipping {str(mp)=}") + continue + result.add_media(Media(filename)) - result.set_title(title).set_timestamp(post.date).set("api_data", post.to_dict()) - if post.message != title: - result.set_content(post.message) + result.set_title(title).set_timestamp(post.date).set("api_data", post.to_dict()) + if post.message != title: + result.set_content(post.message) return result.success("telethon") def _get_media_posts_in_group(self, chat, original_post, max_amp=10): diff --git a/tests/extractors/test_telethon_extractor.py b/tests/extractors/test_telethon_extractor.py index 0a2d5c8..a1a5aa9 100644 --- a/tests/extractors/test_telethon_extractor.py +++ b/tests/extractors/test_telethon_extractor.py @@ -3,6 +3,8 @@ from datetime import date import pytest +from auto_archiver.modules.telethon_extractor.telethon_extractor import TelethonExtractor + @pytest.fixture(autouse=True) def mock_client_setup(mocker): @@ -24,3 +26,37 @@ def test_setup_fails_clear_session_file(get_lazy_module, tmp_path, mocker): assert session_file.exists() assert f"telethon-{date.today().strftime('%Y-%m-%d')}" in lazy_module._instance.session_file assert os.path.exists(lazy_module._instance.session_file + ".session") + + +@pytest.mark.parametrize( + "url,expected", + [ + ("https://t.me/channel/123", True), + ("https://t.me/c/123/456", True), + ("https://t.me/channel/s/789", True), + ("https://t.me/c/123/s/456", True), + ("https://t.me/with_single/1234567?single", True), + ("https://t.me/invalid", False), + ("https://example.com/nottelegram/123", False), + ], +) +def test_valid_url_regex(url, expected, get_lazy_module): + match = TelethonExtractor.valid_url.search(url) + assert bool(match) == expected + + +@pytest.mark.parametrize( + "invite,expected", + [ + ("t.me/joinchat/AAAAAE", True), + ("t.me/+AAAAAE", True), + ("t.me/AAAAAE", True), + ("https://t.me/joinchat/AAAAAE", True), + ("https://t.me/+AAAAAE", True), + ("https://t.me/AAAAAE", True), + ("https://example.com/AAAAAE", False), + ], +) +def test_invite_pattern_regex(invite, expected, get_lazy_module): + match = TelethonExtractor.invite_pattern.search(invite) + assert bool(match) == expected