closes #166 adds story URL feature to telethon extractor

pull/331/head
msramalho 2025-06-18 17:37:44 +01:00
rodzic 592dc30415
commit 12b457706b
Nie znaleziono w bazie danych klucza dla tego podpisu
2 zmienionych plików z 104 dodań i 44 usunięć

Wyświetl plik

@ -5,6 +5,7 @@ import time
from pathlib import Path from pathlib import Path
from datetime import date from datetime import date
from telethon import functions
from telethon.sync import TelegramClient from telethon.sync import TelegramClient
from telethon.errors import ChannelInvalidError from telethon.errors import ChannelInvalidError
from telethon.tl.functions.messages import ImportChatInviteRequest from telethon.tl.functions.messages import ImportChatInviteRequest
@ -24,7 +25,7 @@ from auto_archiver.utils import random_str
class TelethonExtractor(Extractor): class TelethonExtractor(Extractor):
valid_url = re.compile(r"https:\/\/t\.me(\/c){0,1}\/(.+)\/(\d+)") valid_url = re.compile(r"https:\/\/t\.me(\/c){0,1}\/(.+?)(\/s){0,1}\/(\d+)")
invite_pattern = re.compile(r"t.me(\/joinchat){0,1}\/\+?(.+)") invite_pattern = re.compile(r"t.me(\/joinchat){0,1}\/\+?(.+)")
def setup(self) -> None: def setup(self) -> None:
@ -122,13 +123,34 @@ class TelethonExtractor(Extractor):
is_private = match.group(1) == "/c" is_private = match.group(1) == "/c"
chat = int(match.group(2)) if is_private else match.group(2) chat = int(match.group(2)) if is_private else match.group(2)
post_id = int(match.group(3)) is_story = match.group(3) == "/s"
post_id = int(match.group(4))
result = Metadata() result = Metadata()
# NB: not using bot_token since then private channels cannot be archived: self.client.start(bot_token=self.bot_token) # NB: not using bot_token since then private channels cannot be archived: self.client.start(bot_token=self.bot_token)
with self.client.start(): with self.client.start():
# with self.client.start(bot_token=self.bot_token): # with self.client.start(bot_token=self.bot_token):
if is_story:
try:
stories = self.client(functions.stories.GetStoriesByIDRequest(peer=chat, id=[post_id]))
if not stories.stories:
logger.info(f"No stories found for {url}, possibly it's private or the story has expired.")
return False
story = stories.stories[0]
logger.debug(f"TELETHON got story {story.id=} {story.date=} {story.expire_date=}")
result.set_timestamp(story.date).set("views", story.views.to_dict()).set(
"expire_date", story.expire_date
)
# download the story media
filename_dest = os.path.join(self.tmp_dir, f"{chat}_{post_id}", str(story.id))
if filename := self.client.download_media(story.media, filename_dest):
result.add_media(Media(filename))
except Exception as e:
logger.error(f"Error fetching story {post_id} from {chat}: {e}")
return False
else:
try: try:
post = self.client.get_messages(chat, ids=post_id) post = self.client.get_messages(chat, ids=post_id)
except ValueError as e: except ValueError as e:
@ -140,15 +162,13 @@ class TelethonExtractor(Extractor):
) )
return False return False
logger.debug(f"TELETHON GOT POST {post=}") logger.debug(f"TELETHON got post {post=}")
if post is None: if post is None:
return False return False
media_posts = self._get_media_posts_in_group(chat, post) media_posts = self._get_media_posts_in_group(chat, post)
logger.debug(f"got {len(media_posts)=} for {url=}") logger.debug(f"got {len(media_posts)=} for {url=}")
tmp_dir = self.tmp_dir
group_id = post.grouped_id if post.grouped_id is not None else post.id group_id = post.grouped_id if post.grouped_id is not None else post.id
title = post.message title = post.message
for mp in media_posts: for mp in media_posts:
@ -160,15 +180,19 @@ class TelethonExtractor(Extractor):
other_media_urls = [ other_media_urls = [
e.url e.url
for e in mp.entities for e in mp.entities
if hasattr(e, "url") and e.url and self._guess_file_type(e.url) in ["video", "image", "audio"] if hasattr(e, "url")
and e.url
and self._guess_file_type(e.url) in ["video", "image", "audio"]
] ]
if len(other_media_urls): if len(other_media_urls):
logger.debug(f"Got {len(other_media_urls)} other media urls from {mp.id=}: {other_media_urls}") logger.debug(
f"Got {len(other_media_urls)} other media urls from {mp.id=}: {other_media_urls}"
)
for i, om_url in enumerate(other_media_urls): for i, om_url in enumerate(other_media_urls):
filename = self.download_from_url(om_url, f"{chat}_{group_id}_{i}") filename = self.download_from_url(om_url, f"{chat}_{group_id}_{i}")
result.add_media(Media(filename=filename), id=f"{group_id}_{i}") result.add_media(Media(filename=filename), id=f"{group_id}_{i}")
filename_dest = os.path.join(tmp_dir, f"{chat}_{group_id}", str(mp.id)) filename_dest = os.path.join(self.tmp_dir, f"{chat}_{group_id}", str(mp.id))
filename = self.client.download_media(mp.media, filename_dest) filename = self.client.download_media(mp.media, filename_dest)
if not filename: if not filename:
logger.debug(f"Empty media found, skipping {str(mp)=}") logger.debug(f"Empty media found, skipping {str(mp)=}")

Wyświetl plik

@ -3,6 +3,8 @@ from datetime import date
import pytest import pytest
from auto_archiver.modules.telethon_extractor.telethon_extractor import TelethonExtractor
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def mock_client_setup(mocker): def mock_client_setup(mocker):
@ -24,3 +26,37 @@ def test_setup_fails_clear_session_file(get_lazy_module, tmp_path, mocker):
assert session_file.exists() assert session_file.exists()
assert f"telethon-{date.today().strftime('%Y-%m-%d')}" in lazy_module._instance.session_file assert f"telethon-{date.today().strftime('%Y-%m-%d')}" in lazy_module._instance.session_file
assert os.path.exists(lazy_module._instance.session_file + ".session") assert os.path.exists(lazy_module._instance.session_file + ".session")
@pytest.mark.parametrize(
"url,expected",
[
("https://t.me/channel/123", True),
("https://t.me/c/123/456", True),
("https://t.me/channel/s/789", True),
("https://t.me/c/123/s/456", True),
("https://t.me/with_single/1234567?single", True),
("https://t.me/invalid", False),
("https://example.com/nottelegram/123", False),
],
)
def test_valid_url_regex(url, expected, get_lazy_module):
match = TelethonExtractor.valid_url.search(url)
assert bool(match) == expected
@pytest.mark.parametrize(
"invite,expected",
[
("t.me/joinchat/AAAAAE", True),
("t.me/+AAAAAE", True),
("t.me/AAAAAE", True),
("https://t.me/joinchat/AAAAAE", True),
("https://t.me/+AAAAAE", True),
("https://t.me/AAAAAE", True),
("https://example.com/AAAAAE", False),
],
)
def test_invite_pattern_regex(invite, expected, get_lazy_module):
match = TelethonExtractor.invite_pattern.search(invite)
assert bool(match) == expected