Fix beatport and dplay extractor

pull/9793/head
JerryZhouSirui 2024-04-26 16:12:35 -04:00
rodzic 89f535e265
commit 6f8649f213
2 zmienionych plików z 101 dodań i 60 usunięć

Wyświetl plik

@ -2,7 +2,7 @@ import re
from .common import InfoExtractor from .common import InfoExtractor
from ..compat import compat_str from ..compat import compat_str
from ..utils import int_or_none from ..utils import int_or_none, ExtractorError
class BeatportIE(InfoExtractor): class BeatportIE(InfoExtractor):
@ -43,55 +43,47 @@ class BeatportIE(InfoExtractor):
webpage = self._download_webpage(url, display_id) webpage = self._download_webpage(url, display_id)
playables = self._parse_json( try:
self._search_regex( playables_json = self._search_regex(
r'window\.Playables\s*=\s*({.+?});', webpage, r'window\.Playables\s*=\s*({.+?})\s*;', webpage,
'playables info', flags=re.DOTALL), 'playables info', default='{}', flags=re.DOTALL)
track_id) playables = self._parse_json(playables_json, track_id)
except re.error:
raise ExtractorError('Failed to extract playables information. The page structure may have changed.')
track = next(t for t in playables['tracks'] if t['id'] == int(track_id)) if not playables or 'tracks' not in playables:
raise ExtractorError('No playable tracks found in the extracted information.')
title = ', '.join((a['name'] for a in track['artists'])) + ' - ' + track['name'] track = next((t for t in playables['tracks'] if t['id'] == int(track_id)), None)
if track['mix']: if not track:
raise ExtractorError(f'No track with ID {track_id} found.')
title = ', '.join(a['name'] for a in track['artists']) + ' - ' + track['name']
if track.get('mix'):
title += ' (' + track['mix'] + ')' title += ' (' + track['mix'] + ')'
formats = [] formats = []
for ext, info in track['preview'].items(): for ext, info in track.get('preview', {}).items():
if not info['url']: url = info.get('url')
continue if url:
fmt = { fmt = {
'url': info['url'], 'url': url,
'ext': ext, 'ext': ext,
'format_id': ext, 'format_id': ext,
'vcodec': 'none', 'vcodec': 'none',
} 'acodec': 'mp3' if ext == 'mp3' else 'aac',
if ext == 'mp3': 'abr': 96,
fmt['acodec'] = 'mp3' 'asr': 44100
fmt['abr'] = 96 }
fmt['asr'] = 44100 formats.append(fmt)
elif ext == 'mp4':
fmt['acodec'] = 'aac'
fmt['abr'] = 96
fmt['asr'] = 44100
formats.append(fmt)
images = [] images = [{'id': name, 'url': info['url'], 'height': int_or_none(info.get('height')), 'width': int_or_none(info.get('width'))}
for name, info in track['images'].items(): for name, info in track.get('images', {}).items() if name != 'dynamic' and info.get('url')]
image_url = info.get('url')
if name == 'dynamic' or not image_url:
continue
image = {
'id': name,
'url': image_url,
'height': int_or_none(info.get('height')),
'width': int_or_none(info.get('width')),
}
images.append(image)
return { return {
'id': compat_str(track.get('id')) or track_id, 'id': compat_str(track.get('id', track_id)),
'display_id': track.get('slug') or display_id, 'display_id': track.get('slug', display_id),
'title': title, 'title': title,
'formats': formats, 'formats': formats,
'thumbnails': images, 'thumbnails': images
} }

Wyświetl plik

@ -1,6 +1,10 @@
import json import json
import uuid import uuid
from urllib.parse import urlsplit, urljoin
import requests
from .common import InfoExtractor from .common import InfoExtractor
from ..networking.exceptions import HTTPError from ..networking.exceptions import HTTPError
from ..utils import ( from ..utils import (
@ -49,32 +53,77 @@ class DPlayBaseIE(InfoExtractor):
'This video is only available for registered users. You may want to use --cookies.', expected=True) 'This video is only available for registered users. You may want to use --cookies.', expected=True)
raise ExtractorError(info['errors'][0]['detail'], expected=True) raise ExtractorError(info['errors'][0]['detail'], expected=True)
def _update_disco_api_headers(self, headers, disco_base, display_id, realm): def _update_disco_api_headers(self, headers, disco_base, display_id, realm, api_version=2):
headers['Authorization'] = self._get_auth(disco_base, display_id, realm, False) if api_version == 3:
headers.update({
def _download_video_playback_info(self, disco_base, video_id, headers): 'Authorization': self._get_auth(disco_base, display_id, realm, True),
streaming = self._download_json(
disco_base + 'playback/videoPlaybackInfo/' + video_id,
video_id, headers=headers)['data']['attributes']['streaming']
streaming_list = []
for format_id, format_dict in streaming.items():
streaming_list.append({
'type': format_id,
'url': format_dict.get('url'),
}) })
else:
# old behaviour
headers['Authorization'] = self._get_auth(disco_base, display_id, realm, False)
def _download_video_playback_info(self, disco_base, video_id, headers, api_version=2):
"""
Disco Api Playback Info
:param disco_base: The url base, i.e. https://{region}{instance_number}-prod.disco-api.com/ .
:param video_id: The Video ID, part of the url, and used for Log Output by this program.
:param headers: The headers to be used for the request.
:param api_version: Api Version V3 now uses json based approach. Default is 2 to keep old behaviour.
:return: A dictionary with {content_type: url_to_content_type} scheme.
"""
if api_version == 3:
video_playback_info_url = urljoin(base=disco_base, url="playback/v3/videoPlaybackInfo")
request_json_content = {
"deviceInfo": {"adBlocker": False, # deviceInfo is mandatory, some keys inside are optional!
"drmSupported": False,
},
"videoId": "{0}".format(video_id),
}
video_playback_response = requests.post(url=video_playback_info_url,
headers=headers,
json=request_json_content,
)
video_playback_response.raise_for_status()
streaming_list = video_playback_response.json()['data']['attributes']['streaming']
streaming_item = streaming_list[0]
streaming_item_protection = streaming_item.get("protection")
assert streaming_item_protection.get("drmEnabled") is False
else:
# old behaviour
streaming = self._download_json(
disco_base + 'playback/videoPlaybackInfo/' + video_id,
video_id, headers=headers)['data']['attributes']['streaming']
streaming_list = []
for format_id, format_dict in streaming.items():
streaming_list.append({
'type': format_id,
'url': format_dict.get('url'),
})
return streaming_list return streaming_list
def _get_disco_api_info(self, url, display_id, disco_host, realm, country, domain=''): def _get_disco_api_info(self, url, display_id, disco_host, realm, country, domain='', api_version=2):
country = self.get_param('geo_bypass_country') or country country = self.get_param('geo_bypass_country') or country
geo_countries = [country.upper()] geo_countries = [country.upper()]
self._initialize_geo_bypass({ self._initialize_geo_bypass({
'countries': geo_countries, 'countries': geo_countries,
}) })
disco_base = 'https://%s/' % disco_host disco_base = 'https://%s/' % disco_host
headers = { if api_version == 3:
'Referer': url, url_base = "://".join(urlsplit(url)[:2])
} headers = {
self._update_disco_api_headers(headers, disco_base, display_id, realm) 'Referer': urljoin(base=url_base, url="/"),
'Origin': url_base,
}
else:
# old behaviour
headers = {
'Referer': url,
}
self._update_disco_api_headers(headers, disco_base, display_id, realm, api_version=api_version)
try: try:
video = self._download_json( video = self._download_json(
disco_base + 'content/videos/' + display_id, display_id, disco_base + 'content/videos/' + display_id, display_id,
@ -97,7 +146,7 @@ class DPlayBaseIE(InfoExtractor):
subtitles = {} subtitles = {}
try: try:
streaming = self._download_video_playback_info( streaming = self._download_video_playback_info(
disco_base, video_id, headers) disco_base, video_id, headers, api_version=api_version)
except ExtractorError as e: except ExtractorError as e:
if isinstance(e.cause, HTTPError) and e.cause.status == 403: if isinstance(e.cause, HTTPError) and e.cause.status == 403:
self._process_errors(e, geo_countries) self._process_errors(e, geo_countries)