diff --git a/src/auto_archiver/archivers/twitter_archiver.py b/src/auto_archiver/archivers/twitter_archiver.py index 1a356ca..a925be6 100644 --- a/src/auto_archiver/archivers/twitter_archiver.py +++ b/src/auto_archiver/archivers/twitter_archiver.py @@ -1,4 +1,4 @@ -import re, requests, mimetypes, json +import re, requests, mimetypes, json, math from typing import Union from datetime import datetime from loguru import logger @@ -59,17 +59,54 @@ class TwitterArchiver(Archiver): logger.warning(f"No free strategy worked for {url}") return False + + def generate_token(self, tweet_id: str) -> str: + """Generates the syndication token for a tweet ID. + + Taken from https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-2211358215 + And Vercel's code: https://github.com/vercel/react-tweet/blob/main/packages/react-tweet/src/api/fetch-tweet.ts#L27 + """ + + # Perform the division and multiplication by π + result = (int(tweet_id) / 1e15) * math.pi + fractional_part = result % 1 + + # Convert to base 36 + base_36 = '' + while result >= 1: + base_36 = "0123456789abcdefghijklmnopqrstuvwxyz"[int(result % 36)] + base_36 + result = math.floor(result / 36) + + # Append fractional part in base 36 + while fractional_part > 0 and len(base_36) < 11: # Limit to avoid infinite loop + fractional_part *= 36 + digit = int(fractional_part) + base_36 += "0123456789abcdefghijklmnopqrstuvwxyz"[digit] + fractional_part -= digit + + # Remove leading zeros and dots + return base_36.replace('0', '').replace('.', '') + + + def download_syndication(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]: """ - Hack alternative working again. - https://stackoverflow.com/a/71867055/6196010 (OUTDATED URL) + Downloads tweets using Twitter's own embed API (Hack). + + Background on method can be found at: https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-1615937362 + https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-2211358215 next to test: https://cdn.embedly.com/widgets/media.html?&schema=twitter&url=https://twitter.com/bellingcat/status/1674700676612386816 """ - hack_url = f"https://cdn.syndication.twimg.com/tweet-result?id={tweet_id}" - r = requests.get(hack_url) + hack_url = "https://cdn.syndication.twimg.com/tweet-result" + params = { + 'id': tweet_id, + 'token': self.generate_token(tweet_id) + } + + r = requests.get(hack_url, params=params, timeout=10) if r.status_code != 200 or r.json()=={}: logger.warning(f"SyndicationHack: Failed to get tweet information from {hack_url}.") return False @@ -86,7 +123,7 @@ class TwitterArchiver(Archiver): v = tweet["video"] urls.append(self.choose_variant(v.get("variants", []))['url']) - logger.debug(f"Twitter hack got {urls=}") + logger.debug(f"Twitter hack got media {urls=}") for i, u in enumerate(urls): media = Media(filename="") @@ -107,10 +144,18 @@ class TwitterArchiver(Archiver): tie = TwitterIE(downloader) tweet = tie._extract_status(tweet_id) result = Metadata() + try: + if not tweet.get("user") or not tweet.get("created_at"): + raise ValueError(f"Error retreiving post with id {tweet_id}. Are you sure it exists?") + timestamp = datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y") + except (ValueError, KeyError) as ex: + logger.warning(f"Unable to parse tweet: {str(ex)}\nRetreived tweet data: {tweet}") + return False + result\ .set_title(tweet.get('full_text', ''))\ .set_content(json.dumps(tweet, ensure_ascii=False))\ - .set_timestamp(datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y")) + .set_timestamp(timestamp) if not tweet.get("entities", {}).get("media"): logger.debug('No media found, archiving tweet text only') result.status = "twitter-ytdl" diff --git a/src/auto_archiver/formatters/templates/macros.html b/src/auto_archiver/formatters/templates/macros.html index 1373b43..772138f 100644 --- a/src/auto_archiver/formatters/templates/macros.html +++ b/src/auto_archiver/formatters/templates/macros.html @@ -3,7 +3,7 @@ {% for url in m.urls %} {% if url | length == 0 %} No URL available for {{ m.key }}. -{% elif 'http' in url %} +{% elif 'http://' in url or 'https://' in url or url.startswith('/') %} {% if 'image' in m.mimetype %}
diff --git a/tests/archivers/test_twitter_archiver.py b/tests/archivers/test_twitter_archiver.py index 3eae3fb..858f12c 100644 --- a/tests/archivers/test_twitter_archiver.py +++ b/tests/archivers/test_twitter_archiver.py @@ -38,7 +38,6 @@ class TestTwitterArchiver(TestArchiverBase, unittest.TestCase): test_url = "https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w" assert test_url == self.archiver.sanitize_url(test_url) - def test_get_username_tweet_id_from_url(self): # test valid twitter URL @@ -82,10 +81,45 @@ class TestTwitterArchiver(TestArchiverBase, unittest.TestCase): datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "twitter-ytdl" ) + + def test_reverse_engineer_token(self): + # see Vercel's implementation here: https://github.com/vercel/react-tweet/blob/main/packages/react-tweet/src/api/fetch-tweet.ts#L27C1-L31C2 + # and the discussion here: https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-2211358215 + + for tweet_id, real_token in [ + ("1874097816571961839", "4jjngwkifa"), + ("1674700676612386816", "42586mwa3uv"), + ("1877747914073620506", "4jv4aahw36n"), + ("1876710769913450647", "4jruzjz5lux"), + ("1346554693649113090", "39ibqxei7mo"),]: + generated_token = self.archiver.generate_token(tweet_id) + self.assertEqual(real_token, generated_token) + + def test_syndication_archiver(self): + + url = "https://x.com/bellingcat/status/1874097816571961839" + post = self.archiver.download_syndication(self.create_item(url), url, "1874097816571961839") + self.assertTrue(post) + self.assertValidResponseMetadata( + post, + "As 2024 comes to a close, here’s some examples of what Bellingcat investigated per month in our 10th year! 🧵", + datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc) + ) + + def test_download_nonexistend_tweet(self): + # this tweet does not exist + url = "https://x.com/Bellingcat/status/17197025860711058" + response = self.archiver.download(self.create_item(url)) + self.assertFalse(response) + + def test_download_malformed_tweetid(self): + # this tweet does not exist + url = "https://x.com/Bellingcat/status/1719702586071100058" + response = self.archiver.download(self.create_item(url)) + self.assertFalse(response) @pytest.mark.download def test_download_tweet_no_media(self): - # url https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w item = self.create_item("https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w") post = self.archiver.download(item)