kopia lustrzana https://github.com/bellingcat/auto-archiver
Merge pull request #249 from bellingcat/tikwm_dropin
Move tikwm extractor into a droping for the generic extractorpull/244/head
commit
5f7a8b1ac0
|
@ -12,6 +12,8 @@ from loguru import logger
|
|||
from auto_archiver.core.extractor import Extractor
|
||||
from auto_archiver.core import Metadata, Media
|
||||
|
||||
class SkipYtdlp(Exception):
|
||||
pass
|
||||
class GenericExtractor(Extractor):
|
||||
_dropins = {}
|
||||
|
||||
|
@ -268,7 +270,8 @@ class GenericExtractor(Extractor):
|
|||
|
||||
try:
|
||||
if dropin_submodule and dropin_submodule.skip_ytdlp_download(info_extractor, url):
|
||||
raise Exception(f"Skipping using ytdlp to download files for {info_extractor.ie_key()}")
|
||||
logger.debug(f"Skipping using ytdlp to download files for {info_extractor.ie_key()}")
|
||||
raise SkipYtdlp()
|
||||
|
||||
# don't download since it can be a live stream
|
||||
data = ydl.extract_info(url, ie_key=info_extractor.ie_key(), download=False)
|
||||
|
@ -282,15 +285,19 @@ class GenericExtractor(Extractor):
|
|||
if info_extractor.ie_key() == "generic":
|
||||
# don't clutter the logs with issues about the 'generic' extractor not having a dropin
|
||||
return False
|
||||
|
||||
if not isinstance(e, SkipYtdlp):
|
||||
logger.debug(f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use extractor to get post data instead')
|
||||
|
||||
logger.debug(f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use extractor to get post data instead')
|
||||
try:
|
||||
result = self.get_metadata_for_post(info_extractor, url, ydl)
|
||||
except (yt_dlp.utils.DownloadError, yt_dlp.utils.ExtractorError) as post_e:
|
||||
logger.error(f'Error downloading metadata for post: {post_e}')
|
||||
logger.error("Error downloading metadata for post: {error}", error=str(post_e))
|
||||
return False
|
||||
except Exception as generic_e:
|
||||
logger.debug(f'Attempt to extract using ytdlp extractor "{info_extractor.IE_NAME}" failed: \n {repr(generic_e)}', exc_info=True)
|
||||
logger.debug('Attempt to extract using ytdlp extractor "{name}" failed: \n {error}',
|
||||
name=info_extractor.IE_NAME, error=str(generic_e),
|
||||
exc_info=True)
|
||||
return False
|
||||
|
||||
if result:
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
import requests
|
||||
from loguru import logger
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from datetime import datetime, timezone
|
||||
from .dropin import GenericDropin
|
||||
|
||||
class Tiktok(GenericDropin):
|
||||
"""
|
||||
TikTok droping for the Generic Extractor that uses an unofficial API if/when ytdlp fails.
|
||||
It's useful for capturing content that requires a login, like sensitive content.
|
||||
"""
|
||||
|
||||
TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}"
|
||||
|
||||
def extract_post(self, url: str, ie_instance):
|
||||
|
||||
logger.debug(f"Using Tikwm API to attempt to download tiktok video from {url=}")
|
||||
|
||||
endpoint = self.TIKWM_ENDPOINT.format(url=url)
|
||||
|
||||
r = requests.get(endpoint)
|
||||
if r.status_code != 200:
|
||||
raise ValueError(f"unexpected status code '{r.status_code}' from tikwm.com for {url=}:")
|
||||
|
||||
try:
|
||||
json_response = r.json()
|
||||
except ValueError:
|
||||
raise ValueError(f"failed to parse JSON response from tikwm.com for {url=}")
|
||||
|
||||
if not json_response.get('msg') == 'success' or not (api_data := json_response.get('data', {})):
|
||||
raise ValueError(f"failed to get a valid response from tikwm.com for {url=}: {repr(json_response)}")
|
||||
|
||||
# tries to get the non-watermarked version first
|
||||
video_url = api_data.pop("play", api_data.pop("wmplay", None))
|
||||
if not video_url:
|
||||
raise ValueError(f"no valid video URL found in response from tikwm.com for {url=}")
|
||||
|
||||
api_data['video_url'] = video_url
|
||||
return api_data
|
||||
|
||||
|
||||
def create_metadata(self, post: dict, ie_instance, archiver, url):
|
||||
|
||||
# prepare result, start by downloading video
|
||||
result = Metadata()
|
||||
video_url = post.pop("video_url")
|
||||
|
||||
# get the cover if possible
|
||||
cover_url = post.pop("origin_cover", post.pop("cover", post.pop("ai_dynamic_cover", None)))
|
||||
if cover_url and (cover_downloaded := archiver.download_from_url(cover_url)):
|
||||
result.add_media(Media(cover_downloaded))
|
||||
|
||||
# get the video or fail
|
||||
video_downloaded = archiver.download_from_url(video_url, f"vid_{post.get('id', '')}")
|
||||
if not video_downloaded:
|
||||
logger.error(f"failed to download video from {video_url}")
|
||||
return False
|
||||
video_media = Media(video_downloaded)
|
||||
if duration := post.pop("duration", None):
|
||||
video_media.set("duration", duration)
|
||||
result.add_media(video_media)
|
||||
|
||||
# add remaining metadata
|
||||
result.set_title(post.pop("title", ""))
|
||||
|
||||
if created_at := post.pop("create_time", None):
|
||||
result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc))
|
||||
|
||||
if (author := post.pop("author", None)):
|
||||
result.set("author", author)
|
||||
|
||||
result.set("api_data", post)
|
||||
|
||||
return result
|
|
@ -1 +0,0 @@
|
|||
from .tiktok_tikwm_extractor import TiktokTikwmExtractor
|
|
@ -1,23 +0,0 @@
|
|||
{
|
||||
"name": "Tiktok Tikwm Extractor",
|
||||
"type": ["extractor"],
|
||||
"requires_setup": False,
|
||||
"dependencies": {
|
||||
"python": ["loguru", "requests"],
|
||||
"bin": []
|
||||
},
|
||||
"description": """
|
||||
Uses an unofficial TikTok video download platform's API to download videos: https://tikwm.com/
|
||||
|
||||
This extractor complements the generic_extractor which can already get TikTok videos, but this one can extract special videos like those marked as sensitive.
|
||||
|
||||
### Features
|
||||
- Downloads the video and, if possible, also the video cover.
|
||||
- Stores extra metadata about the post like author information, and more as returned by tikwm.com.
|
||||
|
||||
### Notes
|
||||
- If tikwm.com is down, this extractor will not work.
|
||||
- If tikwm.com changes their API, this extractor may break.
|
||||
- If no video is found, this extractor will consider the extraction failed.
|
||||
"""
|
||||
}
|
|
@ -1,75 +0,0 @@
|
|||
import re
|
||||
import requests
|
||||
from loguru import logger
|
||||
from datetime import datetime, timezone
|
||||
from yt_dlp.extractor.tiktok import TikTokIE
|
||||
|
||||
from auto_archiver.core import Extractor
|
||||
from auto_archiver.core import Metadata, Media
|
||||
|
||||
|
||||
class TiktokTikwmExtractor(Extractor):
|
||||
"""
|
||||
Extractor for TikTok that uses an unofficial API and can capture content that requires a login, like sensitive content.
|
||||
"""
|
||||
TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}"
|
||||
|
||||
def download(self, item: Metadata) -> Metadata:
|
||||
url = item.get_url()
|
||||
|
||||
if not re.match(TikTokIE._VALID_URL, url):
|
||||
return False
|
||||
|
||||
endpoint = TiktokTikwmExtractor.TIKWM_ENDPOINT.format(url=url)
|
||||
|
||||
r = requests.get(endpoint)
|
||||
if r.status_code != 200:
|
||||
logger.error(f"unexpected status code '{r.status_code}' from tikwm.com for {url=}:")
|
||||
return False
|
||||
|
||||
try:
|
||||
json_response = r.json()
|
||||
except ValueError:
|
||||
logger.error(f"failed to parse JSON response from tikwm.com for {url=}")
|
||||
return False
|
||||
|
||||
if not json_response.get('msg') == 'success' or not (api_data := json_response.get('data', {})):
|
||||
logger.error(f"failed to get a valid response from tikwm.com for {url=}: {json_response}")
|
||||
return False
|
||||
|
||||
# tries to get the non-watermarked version first
|
||||
video_url = api_data.pop("play", api_data.pop("wmplay", None))
|
||||
if not video_url:
|
||||
logger.error(f"no valid video URL found in response from tikwm.com for {url=}")
|
||||
return False
|
||||
|
||||
# prepare result, start by downloading video
|
||||
result = Metadata()
|
||||
|
||||
# get the cover if possible
|
||||
cover_url = api_data.pop("origin_cover", api_data.pop("cover", api_data.pop("ai_dynamic_cover", None)))
|
||||
if cover_url and (cover_downloaded := self.download_from_url(cover_url)):
|
||||
result.add_media(Media(cover_downloaded))
|
||||
|
||||
# get the video or fail
|
||||
video_downloaded = self.download_from_url(video_url, f"vid_{api_data.get('id', '')}")
|
||||
if not video_downloaded:
|
||||
logger.error(f"failed to download video from {video_url}")
|
||||
return False
|
||||
video_media = Media(video_downloaded)
|
||||
if duration := api_data.pop("duration", None):
|
||||
video_media.set("duration", duration)
|
||||
result.add_media(video_media)
|
||||
|
||||
# add remaining metadata
|
||||
result.set_title(api_data.pop("title", ""))
|
||||
|
||||
if created_at := api_data.pop("create_time", None):
|
||||
result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc))
|
||||
|
||||
if (author := api_data.pop("author", None)):
|
||||
result.set("author", author)
|
||||
|
||||
result.set("api_data", api_data)
|
||||
|
||||
return result.success("tikwm")
|
|
@ -1,86 +1,72 @@
|
|||
from datetime import datetime, timezone
|
||||
import time
|
||||
import pytest
|
||||
import yt_dlp
|
||||
|
||||
from auto_archiver.modules.tiktok_tikwm_extractor.tiktok_tikwm_extractor import TiktokTikwmExtractor
|
||||
from auto_archiver.modules.generic_extractor.generic_extractor import GenericExtractor
|
||||
from .test_extractor_base import TestExtractorBase
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def skip_ytdlp_own_methods(mocker):
|
||||
# mock this method, so that we skip the ytdlp download in these tests
|
||||
mocker.patch("auto_archiver.modules.generic_extractor.tiktok.Tiktok.skip_ytdlp_download", return_value=True)
|
||||
mocker.patch("auto_archiver.modules.generic_extractor.generic_extractor.GenericExtractor.suitable_extractors",
|
||||
return_value=[e for e in yt_dlp.YoutubeDL()._ies.values() if e.IE_NAME == 'TikTok'])
|
||||
|
||||
@pytest.fixture()
|
||||
def mock_get(mocker):
|
||||
return mocker.patch("auto_archiver.modules.generic_extractor.tiktok.requests.get")
|
||||
|
||||
class TestTiktokTikwmExtractor(TestExtractorBase):
|
||||
"""
|
||||
Test suite for TestTiktokTikwmExtractor.
|
||||
"""
|
||||
|
||||
extractor_module = "tiktok_tikwm_extractor"
|
||||
extractor: TiktokTikwmExtractor
|
||||
extractor_module = "generic_extractor"
|
||||
extractor: GenericExtractor
|
||||
|
||||
config = {}
|
||||
|
||||
VALID_EXAMPLE_URL = "https://www.tiktok.com/@example/video/1234"
|
||||
|
||||
@staticmethod
|
||||
def get_mockers(mocker):
|
||||
mock_get = mocker.patch("auto_archiver.modules.tiktok_tikwm_extractor.tiktok_tikwm_extractor.requests.get")
|
||||
mock_logger = mocker.patch("auto_archiver.modules.tiktok_tikwm_extractor.tiktok_tikwm_extractor.logger")
|
||||
return mock_get, mock_logger
|
||||
|
||||
@pytest.mark.parametrize("url,valid_url", [
|
||||
("https://bellingcat.com", False),
|
||||
("https://youtube.com", False),
|
||||
("https://tiktok.co/", False),
|
||||
("https://tiktok.com/", False),
|
||||
("https://www.tiktok.com/", False),
|
||||
("https://api.cool.tiktok.com/", False),
|
||||
(VALID_EXAMPLE_URL, True),
|
||||
("https://www.tiktok.com/@bbcnews/video/7478038212070411542", True),
|
||||
("https://www.tiktok.com/@ggs68taiwan.official/video/7441821351142362375", True),
|
||||
])
|
||||
def test_valid_urls(self, mocker, make_item, url, valid_url):
|
||||
mock_get, mock_logger = self.get_mockers(mocker)
|
||||
if valid_url:
|
||||
mock_get.return_value.status_code = 404
|
||||
assert self.extractor.download(make_item(url)) == False
|
||||
assert mock_get.call_count == int(valid_url)
|
||||
assert mock_logger.error.call_count == int(valid_url)
|
||||
|
||||
def test_invalid_json_responses(self, mocker, make_item):
|
||||
mock_get, mock_logger = self.get_mockers(mocker)
|
||||
def test_invalid_json_responses(self, mock_get, make_item, caplog):
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.json.side_effect = ValueError
|
||||
assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) == False
|
||||
mock_get.assert_called_once()
|
||||
mock_get.return_value.json.assert_called_once()
|
||||
mock_logger.error.assert_called_once()
|
||||
assert mock_logger.error.call_args[0][0].startswith("failed to parse JSON response")
|
||||
with caplog.at_level('DEBUG'):
|
||||
assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) == False
|
||||
mock_get.assert_called_once()
|
||||
mock_get.return_value.json.assert_called_once()
|
||||
# first message is just the 'Skipping using ytdlp to download files for TikTok' message
|
||||
assert "failed to parse JSON response from tikwm.com for url='https://www.tiktok.com/@example/video/1234'" in caplog.text
|
||||
|
||||
mock_get.return_value.json.side_effect = Exception
|
||||
with pytest.raises(Exception):
|
||||
self.extractor.download(make_item(self.VALID_EXAMPLE_URL))
|
||||
mock_get.assert_called()
|
||||
assert mock_get.call_count == 2
|
||||
assert mock_get.return_value.json.call_count == 2
|
||||
with caplog.at_level('ERROR'):
|
||||
assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) == False
|
||||
mock_get.assert_called()
|
||||
assert mock_get.call_count == 2
|
||||
assert mock_get.return_value.json.call_count == 2
|
||||
assert "failed to parse JSON response from tikwm.com for url='https://www.tiktok.com/@example/video/1234'" in caplog.text
|
||||
|
||||
@pytest.mark.parametrize("response", [
|
||||
({"msg": "failure"}),
|
||||
({"msg": "success"}),
|
||||
])
|
||||
def test_unsuccessful_responses(self, mocker, make_item, response):
|
||||
mock_get, mock_logger = self.get_mockers(mocker)
|
||||
def test_unsuccessful_responses(self, mock_get, make_item, response, caplog):
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.json.return_value = response
|
||||
assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) == False
|
||||
mock_get.assert_called_once()
|
||||
mock_get.return_value.json.assert_called_once()
|
||||
mock_logger.error.assert_called_once()
|
||||
assert mock_logger.error.call_args[0][0].startswith("failed to get a valid response")
|
||||
with caplog.at_level('DEBUG'):
|
||||
assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) == False
|
||||
mock_get.assert_called_once()
|
||||
mock_get.return_value.json.assert_called_once()
|
||||
assert "failed to get a valid response from tikwm.com" in caplog.text
|
||||
|
||||
@pytest.mark.parametrize("response,has_vid", [
|
||||
({"data": {"id": 123}}, False),
|
||||
({"data": {"wmplay": "url"}}, True),
|
||||
({"data": {"play": "url"}}, True),
|
||||
])
|
||||
def test_correct_extraction(self, mocker, make_item, response, has_vid):
|
||||
mock_get, mock_logger = self.get_mockers(mocker)
|
||||
def test_correct_extraction(self, mock_get, make_item, response, has_vid):
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.json.return_value = {"msg": "success", **response}
|
||||
|
||||
|
@ -99,8 +85,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
|
|||
else:
|
||||
mock_logger.error.assert_not_called()
|
||||
|
||||
def test_correct_extraction(self, mocker, make_item):
|
||||
mock_get, _ = self.get_mockers(mocker)
|
||||
def test_correct_extraction(self, mock_get, make_item):
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.json.return_value = {"msg": "success", "data": {
|
||||
"wmplay": "url",
|
||||
|
|
Ładowanie…
Reference in New Issue