kopia lustrzana https://github.com/bellingcat/auto-archiver
bot token
rodzic
067e6d8954
commit
562d2f51ad
|
@ -17,6 +17,7 @@ class TelethonArchiver(Archiver):
|
|||
def __init__(self, storage: Storage, driver, config: TelethonConfig):
|
||||
super().__init__(storage, driver)
|
||||
self.client = TelegramClient("./anon", config.api_id, config.api_hash)
|
||||
self.bot_token = config.bot_token
|
||||
|
||||
def _get_media_posts_in_group(self, chat, original_post, max_amp=10):
|
||||
"""
|
||||
|
@ -45,7 +46,7 @@ class TelethonArchiver(Archiver):
|
|||
status = "success"
|
||||
|
||||
# app will ask (stall for user input!) for phone number and auth code if anon.session not found
|
||||
with self.client.start():
|
||||
with self.client.start(bot_token=self.bot_token):
|
||||
matches = list(matches[0])
|
||||
chat, post_id = matches[1], matches[2]
|
||||
|
||||
|
@ -57,11 +58,11 @@ class TelethonArchiver(Archiver):
|
|||
logger.error(f"Could not fetch telegram {url} possibly it's private: {e}")
|
||||
return False
|
||||
except ChannelInvalidError as e:
|
||||
# TODO: check followup here: https://github.com/LonamiWebs/Telethon/issues/3819
|
||||
logger.error(f"Could not fetch telegram {url} possibly it's private or not displayable in : {e}")
|
||||
logger.error(f"Could not fetch telegram {url}. This error can be fixed if you setup a bot_token in addition to api_id and api_hash: {e}")
|
||||
return False
|
||||
|
||||
media_posts = self._get_media_posts_in_group(chat, post)
|
||||
logger.debug(f'got {len(media_posts)=} for {url=}')
|
||||
|
||||
screenshot = self.get_screenshot(url)
|
||||
|
||||
|
@ -93,7 +94,7 @@ class TelethonArchiver(Archiver):
|
|||
return ArchiveResult(status=status, cdn_url=page_cdn, title=message, timestamp=post.date, hash=page_hash, screenshot=screenshot)
|
||||
elif len(media_posts) == 1:
|
||||
key = self.get_key(f'{chat}_{post_id}')
|
||||
filename = self.client.download_media(post.media, os.path.join(Storage.TMP_FOLDER,key))
|
||||
filename = self.client.download_media(post.media, os.path.join(Storage.TMP_FOLDER, key))
|
||||
key = filename.split(Storage.TMP_FOLDER)[1].replace(" ", "")
|
||||
self.storage.upload(filename, key)
|
||||
hash = self.get_hash(filename)
|
||||
|
|
|
@ -130,7 +130,8 @@ class Config:
|
|||
if "telegram" in secrets:
|
||||
self.telegram_config = TelethonConfig(
|
||||
api_id=secrets["telegram"]["api_id"],
|
||||
api_hash=secrets["telegram"]["api_hash"]
|
||||
api_hash=secrets["telegram"]["api_hash"],
|
||||
bot_token=getattr_or(secrets["telegram"], "bot_token")
|
||||
)
|
||||
else:
|
||||
logger.debug(f"'telegram' key not present in the {self.config_file=}")
|
||||
|
|
|
@ -4,4 +4,5 @@ from dataclasses import dataclass
|
|||
@dataclass
|
||||
class TelethonConfig:
|
||||
api_id: str
|
||||
api_hash: str
|
||||
api_hash: str
|
||||
bot_token: str
|
|
@ -16,7 +16,8 @@
|
|||
},
|
||||
"telegram": {
|
||||
"api_id": "your API key, see https://telegra.ph/How-to-get-Telegram-APP-ID--API-HASH-05-27",
|
||||
"api_hash": "your API hash"
|
||||
"api_hash": "your API hash",
|
||||
"bot_token": "optional, but allows access to more content such as large videos, talk to @botfather"
|
||||
},
|
||||
"google_sheets": {
|
||||
"service_account": "normally service_account.json, see https://gspread.readthedocs.io/en/latest/oauth2.html#for-bots-using-service-account"
|
||||
|
|
|
@ -19,7 +19,7 @@ def expand_url(url):
|
|||
logger.error(f'Failed to expand url {url}')
|
||||
return url
|
||||
|
||||
def getattr_or(o: object, prop: str, default: None = None):
|
||||
def getattr_or(o: object, prop: str, default = None):
|
||||
try:
|
||||
res = getattr(o, prop)
|
||||
if res is None: raise
|
||||
|
|
Ładowanie…
Reference in New Issue