kopia lustrzana https://github.com/bellingcat/auto-archiver
twitter archivers
rodzic
f1bc83818d
commit
725bab8240
|
@ -2,13 +2,15 @@
|
|||
from .base_archiver import Archiver, ArchiveResult
|
||||
from .archiver import Archiverv2
|
||||
from .telegram_archiver import TelegramArchiver
|
||||
from .telethon_archiver import TelethonArchiver
|
||||
# from .telethon_archiver import TelethonArchiver
|
||||
from .tiktok_archiver import TiktokArchiver
|
||||
from .wayback_archiver import WaybackArchiver
|
||||
from .youtubedl_archiver import YoutubeDLArchiver
|
||||
from .twitter_archiver import TwitterArchiver
|
||||
# from .twitter_archiver import TwitterArchiver
|
||||
from .vk_archiver import VkArchiver
|
||||
from .twitter_api_archiver import TwitterApiArchiver
|
||||
# from .twitter_api_archiver import TwitterApiArchiver
|
||||
from .instagram_archiver import InstagramArchiver
|
||||
|
||||
from .telethon_archiverv2 import TelethonArchiver
|
||||
from .telethon_archiverv2 import TelethonArchiver
|
||||
from .twitter_archiverv2 import TwitterArchiver
|
||||
from .twitter_api_archiverv2 import TwitterApiArchiver
|
|
@ -13,7 +13,7 @@ from media import Media
|
|||
|
||||
|
||||
class TelethonArchiver(Archiverv2):
|
||||
name = "telethon"
|
||||
name = "telethon_archiver"
|
||||
link_pattern = re.compile(r"https:\/\/t\.me(\/c){0,1}\/(.+)\/(\d+)")
|
||||
invite_pattern = re.compile(r"t.me(\/joinchat){0,1}\/\+?(.+)")
|
||||
|
||||
|
@ -145,8 +145,8 @@ class TelethonArchiver(Archiverv2):
|
|||
continue
|
||||
result.add_media(Media(filename))
|
||||
|
||||
result.set("post", str(post)).set_title(title).set_timestamp(post.date)
|
||||
return result
|
||||
result.set_content(str(post)).set_title(title).set_timestamp(post.date)
|
||||
return result.success("telethon")
|
||||
|
||||
def _get_media_posts_in_group(self, chat, original_post, max_amp=10):
|
||||
"""
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
|
||||
import json
|
||||
from datetime import datetime
|
||||
import mimetypes
|
||||
import os
|
||||
from loguru import logger
|
||||
from pytwitter import Api
|
||||
from slugify import slugify
|
||||
|
||||
from metadata import Metadata
|
||||
from media import Media
|
||||
from .twitter_archiverv2 import TwitterArchiver
|
||||
from .archiver import Archiverv2
|
||||
|
||||
|
||||
class TwitterApiArchiver(TwitterArchiver, Archiverv2):
|
||||
name = "twitter_api_archiver"
|
||||
|
||||
def __init__(self, config: dict) -> None:
|
||||
super().__init__(config)
|
||||
|
||||
if self.bearer_token:
|
||||
self.api = Api(bearer_token=self.bearer_token)
|
||||
elif self.consumer_key and self.consumer_secret and self.access_token and self.access_secret:
|
||||
self.api = Api(
|
||||
consumer_key=self.consumer_key, consumer_secret=self.consumer_secret, access_token=self.access_token, access_secret=self.access_secret)
|
||||
assert hasattr(self, "api") and self.api is not None, "Missing Twitter API configurations, please provide either bearer_token OR (consumer_key, consumer_secret, access_token, access_secret) to use this archiver."
|
||||
|
||||
@staticmethod
|
||||
def configs() -> dict:
|
||||
return {
|
||||
"bearer_token": {"default": None, "help": "twitter API bearer_token which is enough for archiving, if not provided you will need consumer_key, consumer_secret, access_token, access_secret"},
|
||||
"consumer_key": {"default": None, "help": "twitter API consumer_key"},
|
||||
"consumer_secret": {"default": None, "help": "twitter API consumer_secret"},
|
||||
"access_token": {"default": None, "help": "twitter API access_token"},
|
||||
"access_secret": {"default": None, "help": "twitter API access_secret"},
|
||||
}
|
||||
|
||||
def download(self, item: Metadata) -> Metadata:
|
||||
url = item.get_url()
|
||||
# detect URLs that we definitely cannot handle
|
||||
username, tweet_id = self.get_username_tweet_id(url)
|
||||
if not username: return False
|
||||
|
||||
try:
|
||||
tweet = self.api.get_tweet(tweet_id, expansions=["attachments.media_keys"], media_fields=["type", "duration_ms", "url", "variants"], tweet_fields=["attachments", "author_id", "created_at", "entities", "id", "text", "possibly_sensitive"])
|
||||
except Exception as e:
|
||||
logger.error(f"Could not get tweet: {e}")
|
||||
return False
|
||||
|
||||
result = Metadata()
|
||||
result.set_title(tweet.data.text)
|
||||
result.set_timestamp(datetime.strptime(tweet.data.created_at, "%Y-%m-%dT%H:%M:%S.%fZ"))
|
||||
|
||||
urls = []
|
||||
if tweet.includes:
|
||||
for i, m in enumerate(tweet.includes.media):
|
||||
media = Media(filename="")
|
||||
if m.url and len(m.url):
|
||||
media.set("src", m.url)
|
||||
media.set("duration", (m.duration_ms or 1) // 1000)
|
||||
mimetype = "image/jpeg"
|
||||
elif hasattr(m, "variants"):
|
||||
variant = self.choose_variant(m.variants)
|
||||
if not variant: continue
|
||||
media.set("src", variant.url)
|
||||
mimetype = variant.content_type
|
||||
else:
|
||||
continue
|
||||
logger.info(f"Found media {media}")
|
||||
ext = mimetypes.guess_extension(mimetype)
|
||||
media.filename = os.path.join(item.get_tmp_dir(), f'{slugify(url)}_{i}{ext}')
|
||||
self.download_from_url(media.get("src"), media.filename)
|
||||
result.add_media(media)
|
||||
|
||||
result.set_content(json.dumps({
|
||||
"id": tweet.data.id,
|
||||
"text": tweet.data.text,
|
||||
"created_at": tweet.data.created_at,
|
||||
"author_id": tweet.data.author_id,
|
||||
"geo": tweet.data.geo,
|
||||
"lang": tweet.data.lang,
|
||||
"media": urls
|
||||
}, ensure_ascii=False, indent=4))
|
||||
return result.success("twitter")
|
||||
|
||||
def choose_variant(self, variants):
|
||||
# choosing the highest quality possible
|
||||
variant, bit_rate = None, -1
|
||||
for var in variants:
|
||||
if var.content_type == "video/mp4":
|
||||
if var.bit_rate > bit_rate:
|
||||
bit_rate = var.bit_rate
|
||||
variant = var
|
||||
else:
|
||||
variant = var if not variant else variant
|
||||
return variant
|
|
@ -0,0 +1,137 @@
|
|||
import html, re, requests
|
||||
import mimetypes
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime
|
||||
from loguru import logger
|
||||
from metadata import Metadata
|
||||
from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo
|
||||
from archivers import Archiverv2
|
||||
from media import Media
|
||||
from slugify import slugify
|
||||
|
||||
|
||||
class TwitterArchiver(Archiverv2):
|
||||
"""
|
||||
This Twitter Archiver uses unofficial scraping methods.
|
||||
"""
|
||||
|
||||
name = "twitter_archiver"
|
||||
link_pattern = re.compile(r"twitter.com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)")
|
||||
|
||||
def __init__(self, config: dict) -> None:
|
||||
super().__init__(config)
|
||||
|
||||
@staticmethod
|
||||
def configs() -> dict:
|
||||
return {}
|
||||
|
||||
def download(self, item: Metadata) -> Metadata:
|
||||
"""
|
||||
if this url is archivable will download post info and look for other posts from the same group with media.
|
||||
can handle private/public channels
|
||||
"""
|
||||
url = item.get_url()
|
||||
# detect URLs that we definitely cannot handle
|
||||
username, tweet_id = self.get_username_tweet_id(url)
|
||||
if not username: return False
|
||||
|
||||
result = Metadata()
|
||||
|
||||
scr = TwitterTweetScraper(tweet_id)
|
||||
try:
|
||||
tweet = next(scr.get_items())
|
||||
except Exception as ex:
|
||||
logger.warning(f"can't get tweet: {type(ex).__name__} occurred. args: {ex.args}")
|
||||
return self.download_alternative(item, url, tweet_id)
|
||||
|
||||
result.set_title(tweet.content).set_content(tweet.json()).set_timestamp(tweet.date)
|
||||
if tweet.media is None:
|
||||
logger.debug(f'No media found, archiving tweet text only')
|
||||
return result
|
||||
|
||||
for i, tweet_media in enumerate(tweet.media):
|
||||
media = Media(filename="")
|
||||
mimetype = ""
|
||||
if type(tweet_media) == Video:
|
||||
variant = max(
|
||||
[v for v in tweet_media.variants if v.bitrate], key=lambda v: v.bitrate)
|
||||
media.set("src", variant.url).set("duration", tweet_media.duration)
|
||||
mimetype = variant.contentType
|
||||
elif type(tweet_media) == Gif:
|
||||
variant = tweet_media.variants[0]
|
||||
media.set("src", variant.url)
|
||||
mimetype = variant.contentType
|
||||
elif type(tweet_media) == Photo:
|
||||
media.set("src", tweet_media.fullUrl.replace('name=large', 'name=orig'))
|
||||
mimetype = "image/jpeg"
|
||||
else:
|
||||
logger.warning(f"Could not get media URL of {tweet_media}")
|
||||
continue
|
||||
ext = mimetypes.guess_extension(mimetype)
|
||||
media.filename = os.path.join(item.get_tmp_dir(), f'{slugify(url)}_{i}{ext}')
|
||||
self.download_from_url(media.get("src"), media.filename)
|
||||
result.add_media(media)
|
||||
|
||||
return result.success("twitter")
|
||||
|
||||
def download_alternative(self, item: Metadata, url: str, tweet_id: str) -> Metadata:
|
||||
"""
|
||||
CURRENTLY STOPPED WORKING
|
||||
"""
|
||||
return False
|
||||
# https://stackoverflow.com/a/71867055/6196010
|
||||
logger.debug(f"Trying twitter hack for {url=}")
|
||||
result = Metadata()
|
||||
|
||||
hack_url = f"https://cdn.syndication.twimg.com/tweet?id={tweet_id}"
|
||||
r = requests.get(hack_url)
|
||||
if r.status_code != 200: return False
|
||||
tweet = r.json()
|
||||
|
||||
urls = []
|
||||
for p in tweet["photos"]:
|
||||
urls.append(p["url"])
|
||||
|
||||
# 1 tweet has 1 video max
|
||||
if "video" in tweet:
|
||||
v = tweet["video"]
|
||||
urls.append(self.choose_variant(v.get("variants", [])))
|
||||
|
||||
logger.debug(f"Twitter hack got {urls=}")
|
||||
|
||||
for u in urls:
|
||||
media = Media()
|
||||
media.set("src", u)
|
||||
media.filename = os.path.join(item.get_tmp_dir(), f'{slugify(url)}_{i}')
|
||||
self.download_from_url(u, media.filename)
|
||||
result.add_media(media)
|
||||
|
||||
# .set_title(tweet["TODO"])
|
||||
result.set_content(json.dumps(tweet, ensure_ascii=False)).set_timestamp(datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ"))
|
||||
return result
|
||||
|
||||
def get_username_tweet_id(self, url):
|
||||
# detect URLs that we definitely cannot handle
|
||||
matches = self.link_pattern.findall(url)
|
||||
if not len(matches): return False, False
|
||||
|
||||
username, tweet_id = matches[0] # only one URL supported
|
||||
logger.debug(f"Found {username=} and {tweet_id=} in {url=}")
|
||||
|
||||
return username, tweet_id
|
||||
|
||||
def choose_variant(self, variants):
|
||||
# choosing the highest quality possible
|
||||
variant, width, height = None, 0, 0
|
||||
for var in variants:
|
||||
if var.get("type", "") == "video/mp4":
|
||||
width_height = re.search(r"\/(\d+)x(\d+)\/", var["src"])
|
||||
if width_height:
|
||||
w, h = int(width_height[1]), int(width_height[2])
|
||||
if w > width or h > height:
|
||||
width, height = w, h
|
||||
variant = var.get("src", variant)
|
||||
else:
|
||||
variant = var.get("src") if not variant else variant
|
||||
return variant
|
|
@ -25,6 +25,7 @@ class ThumbnailEnricher(Enricher):
|
|||
folder = os.path.join(to_enrich.get_tmp_dir(), str(uuid.uuid4()))
|
||||
os.makedirs(folder, exist_ok=True)
|
||||
for i, m in enumerate(to_enrich.media[::]):
|
||||
logger.info(m)
|
||||
if m.is_video():
|
||||
logger.debug(f"generating thumbnails for {m.filename}")
|
||||
fps, duration = 0.5, m.get("duration")
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
{# templates/results.html #}
|
||||
{% import 'media.html' as macros %}
|
||||
{% import 'macros.html' as macros %}
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
|
||||
|
@ -133,8 +133,8 @@
|
|||
</div>
|
||||
</div>
|
||||
<p></p>
|
||||
{% elif m.properties[prop] | length > 1 %}
|
||||
<li><b>{{ prop }}:</b> <span class="copy">{{ m.properties[prop] }}</span></li>
|
||||
{% elif m.properties[prop] | string | length > 1 %}
|
||||
<li><b>{{ prop }}:</b> {{ macros.copy_urlize(m.properties[prop]) }}</li>
|
||||
{% endif %}
|
||||
|
||||
{% endfor %}
|
||||
|
@ -156,7 +156,7 @@
|
|||
<tr>
|
||||
<td>{{ key }}</td>
|
||||
<td>
|
||||
<span class="copy">{{ metadata[key] | urlize }}</span>
|
||||
{{ macros.copy_urlize(metadata[key]) }}
|
||||
</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
|
|
|
@ -30,4 +30,10 @@ No preview available for {{ m.key }}.
|
|||
{% endif %}
|
||||
{% endfor %}
|
||||
|
||||
{%- endmacro -%}
|
||||
|
||||
{% macro copy_urlize(val) -%}
|
||||
|
||||
<span class="copy">{{ val | string | urlize }}</span>
|
||||
|
||||
{%- endmacro -%}
|
|
@ -12,10 +12,10 @@ from media import Media
|
|||
|
||||
@dataclass
|
||||
class Metadata:
|
||||
status: str = ""
|
||||
status: str = "no archiver"
|
||||
_processed_at: datetime = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
tmp_keys: Set[str] = field(default_factory=set) # keys that are not to be saved in DBs
|
||||
tmp_keys: Set[str] = field(default_factory=set, repr=False) # keys that are not to be saved in DBs
|
||||
media: List[Media] = field(default_factory=list)
|
||||
final_media: Media = None # can be overwritten by formatters
|
||||
rearchivable: bool = False
|
||||
|
@ -28,7 +28,7 @@ class Metadata:
|
|||
"""
|
||||
merges two Metadata instances, will overwrite according to overwrite_left flag
|
||||
"""
|
||||
if right is None: return self
|
||||
if not right: return self
|
||||
if overwrite_left:
|
||||
if right.status and len(right.status):
|
||||
self.status = right.status
|
||||
|
@ -58,8 +58,18 @@ class Metadata:
|
|||
self.metadata[key] = default
|
||||
return self.metadata.get(key, default)
|
||||
|
||||
def success(self, context: str = None) -> Metadata:
|
||||
if context: self.status = f"{context}: success"
|
||||
else: self.status = "success"
|
||||
return self
|
||||
|
||||
def is_success(self) -> bool:
|
||||
return "success" in self.status
|
||||
|
||||
|
||||
# custom getter/setters
|
||||
|
||||
|
||||
def set_url(self, url: str) -> Metadata:
|
||||
assert type(url) is str and len(url) > 0, "invalid URL"
|
||||
return self.set("url", url)
|
||||
|
@ -70,7 +80,7 @@ class Metadata:
|
|||
return url
|
||||
|
||||
def set_content(self, content: str) -> Metadata:
|
||||
# the main textual content/information from a social media post, webpage, ...
|
||||
# a dump with all the relevant content
|
||||
return self.set("content", content)
|
||||
|
||||
def set_title(self, title: str) -> Metadata:
|
||||
|
|
|
@ -112,8 +112,8 @@ class ArchivingOrchestrator:
|
|||
logger.error(f'Got unexpected error on item {item}: {e}\n{traceback.format_exc()}')
|
||||
for d in self.databases: d.failed(item)
|
||||
|
||||
print("holding on 5min")
|
||||
time.sleep(300)
|
||||
print("holding on 5s")
|
||||
time.sleep(5)
|
||||
|
||||
# how does this handle the parameters like folder which can be different for each archiver?
|
||||
# the storage needs to know where to archive!!
|
||||
|
@ -161,9 +161,10 @@ class ArchivingOrchestrator:
|
|||
# this is where the Hashes come from, the place with access to all content
|
||||
# the archiver does not have access to storage
|
||||
# a.download(result) # TODO: refactor so there's not merge here
|
||||
logger.info(f"Trying archiver {a.name}")
|
||||
result.merge(a.download(result))
|
||||
# TODO: fix logic
|
||||
if True or result.is_success(): break
|
||||
# TODO: fix logic to halt when done
|
||||
if result.is_success(): break
|
||||
|
||||
# what if an archiver returns multiple entries and one is to be part of HTMLgenerator?
|
||||
# should it call the HTMLgenerator as if it's not an enrichment?
|
||||
|
|
Ładowanie…
Reference in New Issue