telethon_archiver working for multiple media

pull/72/head
msramalho 2022-12-14 15:37:34 +00:00
rodzic b3860cfec1
commit 53ffa2d4ae
5 zmienionych plików z 125 dodań i 31 usunięć

Wyświetl plik

@ -3,6 +3,7 @@ from abc import abstractmethod
from dataclasses import dataclass from dataclasses import dataclass
from metadata import Metadata from metadata import Metadata
from steps.step import Step from steps.step import Step
import mimetypes, requests
@dataclass @dataclass
@ -12,9 +13,9 @@ class Archiverv2(Step):
def __init__(self, config: dict) -> None: def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called # without this STEP.__init__ is not called
super().__init__(config) super().__init__(config)
# self.setup()
# only for typing... # only for typing...
def init(name: str, config: dict) -> Archiverv2: def init(name: str, config: dict) -> Archiverv2:
return Step.init(name, config, Archiverv2) return Step.init(name, config, Archiverv2)
@ -22,5 +23,23 @@ class Archiverv2(Step):
# used when archivers need to login or do other one-time setup # used when archivers need to login or do other one-time setup
pass pass
def _guess_file_type(self, path: str) -> str:
"""
Receives a URL or filename and returns global mimetype like 'image' or 'video'
see https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Common_types
"""
mime = mimetypes.guess_type(path)[0]
if mime is not None:
return mime.split("/")[0]
return ""
def download_from_url(self, url:str, to_filename:str) -> None:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
}
d = requests.get(url, headers=headers)
with open(to_filename, 'wb') as f:
f.write(d.content)
@abstractmethod @abstractmethod
def download(self, item: Metadata) -> Metadata: pass def download(self, item: Metadata) -> Metadata: pass

Wyświetl plik

@ -7,8 +7,7 @@ from telethon.tl.functions.messages import ImportChatInviteRequest
from telethon.errors.rpcerrorlist import UserAlreadyParticipantError, FloodWaitError, InviteRequestSentError, InviteHashExpiredError from telethon.errors.rpcerrorlist import UserAlreadyParticipantError, FloodWaitError, InviteRequestSentError, InviteHashExpiredError
from loguru import logger from loguru import logger
from tqdm import tqdm from tqdm import tqdm
import re, time, json import re, time, json, os
class TelethonArchiver(Archiverv2): class TelethonArchiver(Archiverv2):
@ -38,6 +37,10 @@ class TelethonArchiver(Archiverv2):
} }
def setup(self) -> None: def setup(self) -> None:
"""
1. trigger login process for telegram or proceed if already saved in a session file
2. joins channel_invites where needed
"""
logger.info(f"SETUP {self.name} checking login...") logger.info(f"SETUP {self.name} checking login...")
with self.client.start(): pass with self.client.start(): pass
@ -56,11 +59,11 @@ class TelethonArchiver(Archiverv2):
channel_id = channel_invite.get("id", False) channel_id = channel_invite.get("id", False)
invite = channel_invite["invite"] invite = channel_invite["invite"]
if (match := self.invite_pattern.search(invite)): if (match := self.invite_pattern.search(invite)):
try: try:
if channel_id: if channel_id:
ent = self.client.get_entity(int(channel_id)) # fails if not a member ent = self.client.get_entity(int(channel_id)) # fails if not a member
else: else:
ent = self.client.get_entity(invite) # fails if not a member ent = self.client.get_entity(invite) # fails if not a member
logger.warning(f"please add the property id='{ent.id}' to the 'channel_invites' configuration where {invite=}, not doing so can lead to a minutes-long setup time due to telegram's rate limiting.") logger.warning(f"please add the property id='{ent.id}' to the 'channel_invites' configuration where {invite=}, not doing so can lead to a minutes-long setup time due to telegram's rate limiting.")
except ValueError as e: except ValueError as e:
logger.info(f"joining new channel {invite=}") logger.info(f"joining new channel {invite=}")
@ -80,35 +83,80 @@ class TelethonArchiver(Archiverv2):
continue continue
else: else:
logger.warning(f"Invalid invite link {invite}") logger.warning(f"Invalid invite link {invite}")
i+=1 i += 1
pbar.update() pbar.update()
def download(self, item: Metadata) -> Metadata: def download(self, item: Metadata) -> Metadata:
url = self.get_url(item) url = item.get_url()
print(f"downloading {url=}") print(f"downloading {url=}")
# detect URLs that we definitely cannot handle # detect URLs that we definitely cannot handle
match = self.link_pattern.search(url) match = self.link_pattern.search(url)
if not match: return False if not match: return False
# app will ask (stall for user input!) for phone number and auth code if anon.session not found is_private = match.group(1) == "/c"
# TODO: not using bot_token since then private channels cannot be archived chat = int(match.group(2)) if is_private else match.group(2)
# with self.client.start(bot_token=self.bot_token): post_id = int(match.group(3))
with self.client.start():
# self.client(ImportChatInviteRequest('4kAkN49IKJBhZDk6'))
is_private = match.group(1) == "/c"
print(f"{is_private=}")
chat = int(match.group(2)) if is_private else match.group(2)
post_id = int(match.group(3))
result = Metadata()
# NB: not using bot_token since then private channels cannot be archived: self.client.start(bot_token=self.bot_token)
with self.client.start():
try: try:
post = self.client.get_messages(chat, ids=post_id) post = self.client.get_messages(chat, ids=post_id)
except ValueError as e: except ValueError as e:
logger.error(f"Could not fetch telegram {url} possibly it's private: {e}") logger.error(f"Could not fetch telegram {url} possibly it's private: {e}")
return False return False
except ChannelInvalidError as e: except ChannelInvalidError as e:
logger.error(f"Could not fetch telegram {url}. This error can be fixed if you setup a bot_token in addition to api_id and api_hash: {e}") logger.error(f"Could not fetch telegram {url}. This error may be fixed if you setup a bot_token in addition to api_id and api_hash (but then private channels will not be archived, we need to update this logic to handle both): {e}")
return False return False
if post is None: return False if post is None: return False
print(post) logger.info(f"fetched telegram {post.id=}")
media_posts = self._get_media_posts_in_group(chat, post)
logger.debug(f'got {len(media_posts)=} for {url=}')
tmp_dir = item.get("tmp_dir")
group_id = post.grouped_id if post.grouped_id is not None else post.id
title = post.message
for mp in media_posts:
if len(mp.message) > len(title): title = mp.message # save the longest text found (usually only 1)
# media can also be in entities
if mp.entities:
other_media_urls = [e.url for e in mp.entities if hasattr(e, "url") and e.url and self._guess_file_type(e.url) in ["video", "image"]]
logger.debug(f"Got {len(other_media_urls)} other medial urls from {mp.id=}: {other_media_urls}")
for om_url in other_media_urls:
filename = os.path.join(tmp_dir, f'{chat}_{group_id}_{self._get_key_from_url(om_url)}')
self.download_from_url(om_url, filename)
result.add_media(filename)
filename_dest = os.path.join(tmp_dir, f'{chat}_{group_id}', str(mp.id))
filename = self.client.download_media(mp.media, filename_dest)
if not filename:
logger.debug(f"Empty media found, skipping {str(mp)=}")
continue
result.add_media(filename)
result.set("post", post).set_title(title).set_timestamp(post.date)
return result
def _get_media_posts_in_group(self, chat, original_post, max_amp=10):
"""
Searches for Telegram posts that are part of the same group of uploads
The search is conducted around the id of the original post with an amplitude
of `max_amp` both ways
Returns a list of [post] where each post has media and is in the same grouped_id
"""
if getattr(original_post, "grouped_id", None) is None:
return [original_post] if getattr(original_post, "media", False) else []
search_ids = [i for i in range(original_post.id - max_amp, original_post.id + max_amp + 1)]
posts = self.client.get_messages(chat, ids=search_ids)
media = []
for post in posts:
if post is not None and post.grouped_id == original_post.grouped_id and post.media is not None:
media.append(post)
return media

Wyświetl plik

@ -1,7 +1,9 @@
from __future__ import annotations from __future__ import annotations
from ast import List
from typing import Any, Union, Dict from typing import Any, Union, Dict
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime
@dataclass @dataclass
@ -15,8 +17,8 @@ class Metadata:
metadata: Dict[str, Any] metadata: Dict[str, Any]
# TODO: remove and use default? # TODO: remove and use default?
def __init__(self) -> None: def __init__(self, status="") -> None:
self.status = "" self.status = status
self.metadata = {} self.metadata = {}
# @staticmethod # @staticmethod
@ -27,14 +29,38 @@ class Metadata:
pass pass
# TODO: setters? # TODO: setters?
def set(self, key: str, val: Any) -> Union[Metadata, str]: def set(self, key: str, val: Any) -> Metadata:
# goes through metadata and returns the Metadata available # goes through metadata and returns the Metadata available
self.metadata[key] = val self.metadata[key] = val
return self
def get(self, key: str, default: Any = None) -> Union[Metadata, str]: def get(self, key: str, default: Any = None) -> Union[Metadata, str]:
# goes through metadata and returns the Metadata available # goes through metadata and returns the Metadata available
return self.metadata.get(key, default) return self.metadata.get(key, default)
# custom getter/setters
def set_url(self, url: str) -> Metadata:
assert type(url) is str and len(url) > 0, "invalid URL"
return self.set("url", url)
def get_url(self) -> str:
url = self.get("url")
assert type(url) is str and len(url) > 0, "invalid URL"
return url
def get_media(self) -> List:
return self.get("media", [])
def set_title(self, title: str) -> Metadata:
return self.set("title", title)
def set_timestamp(self, title: datetime) -> Metadata:
return self.set("title", title)
def add_media(self, filename: str) -> Metadata:
return self.get_media().append(filename)
def as_json(self) -> str: def as_json(self) -> str:
# converts all metadata and data into JSON # converts all metadata and data into JSON
pass pass

Wyświetl plik

@ -6,6 +6,7 @@ from archivers.archiver import Archiverv2
from enrichers.enricher import Enricher from enrichers.enricher import Enricher
from metadata import Metadata from metadata import Metadata
import tempfile, time
""" """
how not to couple the different pieces of logic how not to couple the different pieces of logic
@ -155,19 +156,24 @@ class ArchivingOrchestrator:
def feed(self) -> list(ArchiveResult): def feed(self) -> list(ArchiveResult):
for url in self.feeder: for url in self.feeder:
print("ARCHIVING", url) print("ARCHIVING", url)
self.archive(url) with tempfile.TemporaryDirectory(dir="./") as tmp_dir:
self.archive(url, tmp_dir)
print("holding on")
time.sleep(300)
# how does this handle the parameters like folder which can be different for each archiver? # how does this handle the parameters like folder which can be different for each archiver?
# the storage needs to know where to archive!! # the storage needs to know where to archive!!
# solution: feeders have context: extra metadata that they can read or ignore, # solution: feeders have context: extra metadata that they can read or ignore,
# all of it should have sensible defaults (eg: folder) # all of it should have sensible defaults (eg: folder)
# default feeder is a list with 1 element # default feeder is a list with 1 element
def archive(self, url) -> Union[ArchiveResult, None]: def archive(self, url: str, tmp_dir: str) -> Union[Metadata, None]:
# TODO: # TODO:
# url = clear_url(url) # url = clear_url(url)
# result = Metadata(url=url) # result = Metadata(url=url)
result = Metadata() result = Metadata()
result.set("url", url) result.set_url(url)
result.set("tmp_dir", tmp_dir)
should_archive = True should_archive = True
for d in self.databases: should_archive &= d.should_process(url) for d in self.databases: should_archive &= d.should_process(url)

Wyświetl plik

@ -29,8 +29,3 @@ class Step(ABC):
print(sub.name, "CALLING NEW") print(sub.name, "CALLING NEW")
return sub(config) return sub(config)
raise ClassFoundException(f"Unable to initialize STEP with {name=}") raise ClassFoundException(f"Unable to initialize STEP with {name=}")
def get_url(self, item: Metadata) -> str:
url = item.get("url")
assert type(url) is str and len(url) > 0
return url