Implement basic spotify scraper

pull/1/head
Michael Kuperfish Steinberg 2023-01-05 14:31:44 +02:00
commit e5f146084a
12 zmienionych plików z 1192 dodań i 0 usunięć

75
auto_compressor.py 100644
Wyświetl plik

@ -0,0 +1,75 @@
import os
from zipfile import ZipFile
import time
import sys
class ZipUtilities:
def toZip(self, files, filename):
zip_file = ZipFile(filename, 'w')
for file in files:
if os.path.isfile(file):
zip_file.write(file)
else:
self.addFolderToZip(zip_file, file)
zip_file.close()
def addFolderToZip(self, zip_file, folder):
print(f'Adding folder {folder} to archive {zip_file}')
for file in os.listdir(folder):
full_path = os.path.join(folder, file)
if os.path.isfile(full_path):
print('File added: ' + str(full_path))
zip_file.write(full_path)
elif os.path.isdir(full_path):
print('Entering folder: ' + str(full_path))
self.addFolderToZip(zip_file, full_path)
def get_directory_size(start_path = '.'):
total_size = 0
for dirpath, dirnames, filenames in os.walk(start_path):
for f in filenames:
fp = os.path.join(dirpath, f)
# skip if it is symbolic link
if not os.path.islink(fp):
total_size += os.path.getsize(fp)
return total_size
def get_zip_name() -> str:
return f'{sys.argv[2]}/Download_{str(time.time())}.zip'
def zip_dirs(dir_list:list) -> str:
zname = get_zip_name()
utilities = ZipUtilities()
utilities.toZip(dir_list, zname)
def zip_bunches(start_path:str='music', max_zip_size:int=3813764863):
current_bunch = []
current_bunch_size = 0
for dir_name in os.listdir(start_path):
dir_path = os.path.join(start_path, dir_name)
dir_size = get_directory_size(dir_path)
print(f'Dir: {dir_path} of size {dir_size}')
if current_bunch_size + dir_size < max_zip_size:
current_bunch.append(dir_path)
current_bunch_size += dir_size
else:
zip_dirs(current_bunch)
current_bunch.clear()
current_bunch_size = 0
zip_dirs(current_bunch)
current_bunch.clear()
current_bunch_size = 0
if __name__ == '__main__':
if len(sys.argv) == 1:
zip_bunches()
else:
zip_bunches(sys.argv[1])

33
config.py 100644
Wyświetl plik

@ -0,0 +1,33 @@
from urllib.parse import urlencode
from http import client
import requests
from requests import Response
from lxml import html
import json
import datetime, time
import hashlib
import eyed3
import os
import shutil
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from pydeezer.constants import track_formats
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36'
SP_DC = os.getenv('SP_DC')
SP_KEY = os.getenv('SP_KEY')
#PROXY = {"http": "127.0.0.1:8080", "https": "127.0.0.1:8080"}
PROXY = {}
VERIFY_SSL = True
FULL_DOWNLOAD_RECURISVE_LIMIT = 0x4000
FULL_DOWNLOAD_THREAD_LIMIT = 50
VERBOSE_OUTPUTS = False
DEFAULT_DOWNLOAD_DIRECTORY = 'music_05-01-2023'
ARTIST_IMAGES_SUB_DIR = '_Artists'
GLOBALS_SAVE_FILE = '_downloaded_store.pkl'
def clean_file_path(prompt: str):
return prompt.replace('?', '').replace('"', '').replace('*', '').replace('|', '').replace('\\', '').replace(':', '').replace('>', '').replace('<', '')

21
main.py 100644
Wyświetl plik

@ -0,0 +1,21 @@
from config import *
from webgui import app
import spotify_mass_download
from spotify_mass_download import full_download, save_globals_save_file
from threading import Thread
def main():
print(f'Spotify Fuzzer')
print('\n\n\n')
spotify_mass_download.g_keep_saving += 1
save_globals_save_file_thread = Thread(target=save_globals_save_file)
save_globals_save_file_thread.start()
app.run(host='127.0.0.1', port=8888, debug=False)
spotify_mass_download.g_keep_saving -= 1
if __name__ == '__main__':
main()

2
requirements.txt 100644
Wyświetl plik

@ -0,0 +1,2 @@
cryptography
py-deezer

156
spotify_client.py 100644
Wyświetl plik

@ -0,0 +1,156 @@
from config import *
class SpotifyClient:
_proxy = PROXY
_client_token = ''
_access_token = ''
_client_id = ''
__USER_AGENT = USER_AGENT
_verify_ssl = VERIFY_SSL
user_data = None
def __init__(self, sp_dc=None, sp_key=None):
self.dc = sp_dc
self.key = sp_key
self.get_tokens(sp_dc, sp_key)
def get_tokens(self, sp_dc=None, sp_key=None):
self._access_token, self._client_id = self.get_access_token(sp_dc=sp_dc, sp_key=sp_key)
self._client_token = self.get_client_token(self._client_id)
print('Client token: ', self._client_token)
print('Access token: ', self._access_token)
def refresh_tokens(self):
self.get_tokens(self.dc, self.key)
def get_client_token(self, client_id: str):
with requests.session() as session:
session.proxies = self._proxy
session.headers = {
"User-Agent": self.__USER_AGENT,
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.5"
}
data = {
"client_data": {
"client_version": "",
"client_id": client_id,
"js_sdk_data":
{
"device_brand": "",
"device_model": "",
"os": "",
"os_version": ""
}
}
}
response = session.post('https://clienttoken.spotify.com/v1/clienttoken', json=data, verify=self._verify_ssl)
return response.json()['granted_token']['token']
def get_access_token(self, keys=None, sp_dc=None, sp_key=None):
with requests.session() as session:
session.proxies = self._proxy
session.headers = {
'User-Agent': self.__USER_AGENT,
'Accept': 'application/json',
'Accept-Language': 'en',
'Accept-Encoding': 'gzip, deflate',
'App-Platform': 'WebPlayer',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Referer': 'https://open.spotify.com/',
'Te': 'trailers'
}
cookie = {}
if keys is not None:
cookie = keys
if sp_dc is not None:
cookie['sp_dc'] = sp_dc
if sp_key is not None:
cookie['sp_key'] = sp_key
response = session.get('https://open.spotify.com/get_access_token', verify=self._verify_ssl, cookies=cookie)
print('Access token is anon: ', response.json()['isAnonymous'])
self.is_anonymous = response.json()['isAnonymous']
return response.json()['accessToken'], response.json()['clientId']
def get_me(self):
with requests.session() as session:
session.proxies = self._proxy
session.headers = {
'User-Agent': self.__USER_AGENT,
"Accept-Language": "en-US,en;q=0.5",
'Accept': 'application/json',
'Client-Token': self._client_token,
'Authorization': f'Bearer {self._access_token}',
'Origin': 'https://open.spotify.com',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Referer': 'https://open.spotify.com/',
'Te': 'trailers'
}
response_json = session.get('https://api.spotify.com/v1/me', verify=self._verify_ssl).json()
self.user_data = response_json
if self.user_data['product'] == 'premium':
raise Exception('THIS USER IS PREMIUM!')
return response_json
def get_premium_keys(self):
page = requests.get('https://www.rkstore.tn/2022/03/spotify-premium-cookies.html', verify=self._verify_ssl)
root = html.document_fromstring(page.content)
cookies_element = root.get_element_by_id('download_link')
cookies = json.loads(cookies_element.text_content())
prem_keys = {}
for cookie in cookies:
prem_keys[cookie['name']] = cookie['value']
return prem_keys
def get(self, url: str) -> Response:
with requests.session() as session:
session.proxies = self._proxy
session.headers = {
'User-Agent': self.__USER_AGENT,
'Accept': 'application/json',
'Client-Token': self._client_token,
'Authorization': f'Bearer {self._access_token}',
'Origin': 'https://open.spotify.com',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Referer': 'https://open.spotify.com/',
'Te': 'trailers',
'App-Platform': 'WebPlayer'
}
response = session.get(url, verify=self._verify_ssl)
return response
def post(self, url: str, payload=None) -> Response:
with requests.session() as session:
session.proxies = self._proxy
session.headers = {
'User-Agent': self.__USER_AGENT,
'Accept': 'application/json',
'Client-Token': self._client_token,
'Authorization': f'Bearer {self._access_token}',
'Origin': 'https://open.spotify.com',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Referer': 'https://open.spotify.com/',
'Te': 'trailers',
'App-Platform': 'WebPlayer'
}
response = session.post(url, verify=self._verify_ssl, data=payload)
return response

Wyświetl plik

@ -0,0 +1,157 @@
from threading import Thread, get_ident
import pickle
from spotify_client import SpotifyClient
from spotify_scraper import SpotifyScraper
from config import *
import base64
from time import sleep
from datetime import datetime
client = SpotifyClient(sp_key=SP_KEY, sp_dc=SP_DC)
client.get_me()
scraper = SpotifyScraper(client=client)
g_downloaded_artist_covers = []
g_downloaded_songs = []
g_keep_saving = 0
class Console:
console_output = []
def log(self, value: str):
self.cout(value, 'inherit')
def error(self, value: str):
self.cout(value, 'rgba(255,30,30,0.9)')
def info(self, value: str):
self.cout(value, 'rgba(30,255,255,0.9)')
def happy(self, value: str):
self.cout(value, 'rgba(30,255,30,0.9)')
def cout(self, value: str, color: str):
self.console_output.append(
{
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'value': value,
'color': color,
}
)
def get(self):
return self.console_output
console = Console()
def download_track_list(download_dir: str, track_list: list, recursive_artist: bool=False, recursive_album: bool=False, recursive: bool=False, recursive_limit=1024):
global g_downloaded_songs, g_downloaded_artist_covers
my_thread_id = str(get_ident()).zfill(6)
artist_images_download_dir = f'{download_dir}/{ARTIST_IMAGES_SUB_DIR}'
downloaded_count = 0
for track in track_list:
try:
if downloaded_count % 20 == 0:
client.refresh_tokens()
if track.spotify_id in g_downloaded_songs:
console.info(f'Thread<{my_thread_id}> | Skipping already downloaded song: {track.title}')
downloaded_count += 1
continue
g_downloaded_songs.append(track.spotify_id)
track_path = f'{download_dir}/{track.artists[0].name}/{track.album.title}/{", ".join([x.name for x in track.artists])} - {track.title} [{track.album.title}].mp3'
track.download_to_file(scraper, track_path)
console.happy(f'Thread<{my_thread_id}> | Downloaded: {track_path}')
if (recursive_album or recursive) and len(track_list) < recursive_limit:
new_tracks = scraper.scrape_album_tracks(track.album.spotify_id)
for new_track in new_tracks:
if new_track not in track_list and len(track_list) < recursive_limit:
track_list.append(new_track)
console.log(f'Thread<{my_thread_id}> | Scraped {len(new_tracks)} new songs through recursive album!')
for artist in track.artists:
if artist.spotify_id not in g_downloaded_artist_covers:
try:
artist_image = artist.download_image(scraper)
artist_name = base64.b64encode(artist.name.encode()).decode()
with open(f'{artist_images_download_dir}/{artist_name}.jpg', 'wb') as f:
f.write(artist_image)
except Exception as ex:
console.error(str(ex))
g_downloaded_artist_covers.append(artist.spotify_id)
if (recursive_artist or recursive) and len(track_list) < recursive_limit:
old_size = len(track_list)
track_list += scraper.scrape_artist_tracks(artist.spotify_id)
if recursive_artist:
albums = scraper.scrape_artist_albums(artist.spotify_id)
for album in albums:
track_list += scraper.scrape_album_tracks(album['id'])
console.log(f'Thread<{my_thread_id}> | Scraped {len(track_list) - old_size} new songs through recursive artist!')
except Exception as ex:
console.error(f'Thread<{my_thread_id}> | Exception: {ex}')
downloaded_count += 1
if VERBOSE_OUTPUTS:
console.log(f'Thread<{my_thread_id}> | Processed {downloaded_count} / {len(track_list)}')
def save_globals_save_file():
global g_keep_saving, g_downloaded_artist_covers, g_downloaded_songs
try:
with open(GLOBALS_SAVE_FILE, 'r') as f:
data = json.loads(f.read())
g_downloaded_songs = json.loads(data['songs'])
g_downloaded_artist_covers = json.loads(data['artists'])
except Exception as ex:
console.error(f'Failed to load globals save file! Exception: {ex}')
while g_keep_saving > 0:
with open(GLOBALS_SAVE_FILE, 'w') as f:
g_downloaded_songs_json = json.dumps(g_downloaded_songs)
g_downloaded_artist_covers_json = json.dumps(g_downloaded_artist_covers)
data = {'songs':g_downloaded_songs_json, 'artists': g_downloaded_artist_covers_json }
f.write( json.dumps(data) )
if VERBOSE_OUTPUTS:
console.log('Saved globals file!')
sleep(15)
def full_download(download_dir: str, identifier: str, recursive_artist: bool=False, recursive_album: bool=False, recursive: bool=False, recursive_limit:int=1024, thread_count:int=5):
global g_downloaded_songs, g_downloaded_artist_covers, g_keep_saving
artist_images_download_dir = f'{download_dir}/{ARTIST_IMAGES_SUB_DIR}'
os.makedirs(artist_images_download_dir, exist_ok=True)
os.makedirs(f'temp', exist_ok=True)
g_keep_saving += 1
client.refresh_tokens()
console.log(f'Recieved scrape command on identifier: {identifier}, {recursive=}, {recursive_artist=}, {recursive_album=}, {recursive_limit=}, {thread_count=}')
track_list = scraper.scrape_tracks(identifier, console=console)
console.log(f'Scraping on identifier: {identifier} yielded {len(track_list)} tracks!')
download_threads = []
thread_subsection_size = int(len(track_list) / thread_count)
for i in range(thread_count - 1):
download_threads.append(Thread(target=download_track_list, args=(download_dir, track_list[thread_subsection_size * i : (thread_subsection_size * i) + thread_subsection_size], recursive_artist, recursive_album, recursive, recursive_limit)))
download_threads[-1].start()
sleep(0.05)
download_threads.append(Thread(target=download_track_list, args=(download_dir, track_list[thread_subsection_size * (thread_count - 1):], recursive_artist, recursive_album, recursive, recursive_limit)))
download_threads[-1].start()
[x.join() for x in download_threads]
console.log(f'Comletely done scraping identifier: {identifier}!')
g_keep_saving -= 1
def download_all_categories_playlists():
client.refresh_tokens()
os.makedirs(f'{DEFAULT_DOWNLOAD_DIRECTORY}/_Playlists/', exist_ok=True)
category_ids = scraper.get_categories_ids()
for category_id in category_ids:
playlist_ids = scraper.get_category_playlist_ids(category_id)
for playlist_id in playlist_ids:
playlist = scraper.get_playlist(playlist_id)
with open(f'{DEFAULT_DOWNLOAD_DIRECTORY}/_Playlists/{playlist.spotify_id}.playlist', 'w') as f:
f.write(playlist.export())
full_download(f'{DEFAULT_DOWNLOAD_DIRECTORY}', identifier=playlist.href)

159
spotify_scraper.py 100644
Wyświetl plik

@ -0,0 +1,159 @@
from concurrent.futures import process
from config import *
from spotify_utils import *
from spotify_client import SpotifyClient
from enum import Enum
class SpotifyScraper:
_client = None
class IDTypes(Enum):
Playlist = 0
Album = 1
Artist = 2
Track = 3
Unknown = -1
def __init__(self, sp_dc=None, sp_key=None, client=None) -> None:
if client is not None:
self._client = client
else:
self._client = SpotifyClient(sp_dc=sp_dc, sp_key=sp_key)
def identify_link_type(self, link: str) -> IDTypes:
if 'playlist' in link.lower():
return self.IDTypes.Playlist
elif 'album' in link.lower():
return self.IDTypes.Album
elif 'artist' in link.lower():
return self.IDTypes.Artist
elif 'track' in link.lower():
return self.IDTypes.Track
return self.IDTypes.Unknown
def extract_id_from_link(self, link: str) -> str:
return link[link.rindex('/') + 1:]
def scrape_tracks(self, link: str, console=None) -> list:
id_type = self.identify_link_type(link)
if id_type == self.IDTypes.Playlist:
return self.scrape_playlist_tracks(self.extract_id_from_link(link))
elif id_type == self.IDTypes.Album:
return self.scrape_album_tracks(self.extract_id_from_link(link))
elif id_type == self.IDTypes.Artist:
return self.scrape_artist_tracks(self.extract_id_from_link(link), intense=True, console=console)
def scrape_playlist(self, playlist_id: str):
return self._client.get(f'https://api.spotify.com/v1/playlists/{playlist_id}').json()
def scrape_playlist_tracks(self, playlist_id: str):
offset = 0
limit = 100
playlist_data = self._client.get(f'https://api.spotify.com/v1/playlists/{playlist_id}/tracks?offset={offset}&limit={limit}&market=from_token').json()
tracks = playlist_data['items']
while playlist_data['next'] is not None:
offset += limit
playlist_data = self._client.get(f'https://api.spotify.com/v1/playlists/{playlist_id}/tracks?offset={offset}&limit={limit}&market=from_token').json()
tracks += playlist_data['items']
if len(tracks) != int(playlist_data['total']):
print(f'Warning: track count does not match! {len(tracks)} != {int(playlist_data["tracks"]["total"])}')
return [SpotifyTrack(track_data) for track_data in tracks]
def scrape_album(self, album_id: str):
return self._client.get(f'https://api.spotify.com/v1/albums/{album_id}').json()
def scrape_album_tracks(self, album_id: str):
limit = 50
offset = 0
ret = self._client.get(f'https://api.spotify.com/v1/albums/{album_id}/tracks?limit={limit}').json()
tracks = ret['items']
while ret['next'] is not None:
offset += limit
ret = self._client.get(f'https://api.spotify.com/v1/albums/{album_id}/tracks?offset={offset}&limit={limit}').json()
tracks += ret['items']
if len(tracks) != int(ret['total']):
print(f'Warning: track count does not match! {len(tracks)} != {int(ret["total"])}')
processed_tracks = []
for track_data in tracks:
processed_tracks.append(SpotifyTrack(self.get(track_data['href']).json()))
return processed_tracks
def scrape_artist(self, artist_id: str):
return self.get(f'https://api.spotify.com/v1/artists/{artist_id}/top-tracks?market=from_token').json()
def scrape_artist_albums(self, artist_id: str):
offset = 0
limit = 50
albums_data = self.get(f'https://api.spotify.com/v1/artists/{artist_id}/albums?market=from_token&limit={limit}&offset={offset}').json()
albums = albums_data['items']
while albums_data['next'] is not None:
offset += limit
albums_data = self.get(f'https://api.spotify.com/v1/artists/{artist_id}/albums?market=from_token&limit={limit}&offset={offset}').json()
albums += albums_data['items']
return albums
def scrape_artist_tracks(self, artist_id: str, intense:bool=False, console=None):
tracks = self.scrape_artist(artist_id)['tracks']
try:
artist_name = tracks[0]['album']['artists'][0]['name']
except:
artist_name = 'Unknown'
proccessed_tracks = [SpotifyTrack(track_data) for track_data in tracks]
if intense:
albums = self.scrape_artist_albums(artist_id)
proccessed_album_count = 0
for album in albums:
proccessed_tracks += self.scrape_album_tracks(album['id'])
proccessed_album_count += 1
if console is not None:
console.log(f'Scraping {artist_name}\'s albums: {proccessed_album_count} / {len(albums)}')
return proccessed_tracks
def get(self, url: str) -> Response:
return self._client.get(url)
def post(self, url: str, payload=None) -> Response:
return self._client.post(url, payload=payload)
def get_lyrics(self, track_id: str) -> str:
try:
return self.get(f'https://spclient.wg.spotify.com/color-lyrics/v2/track/{track_id}').json()
except Exception as ex:
return ''
def get_track_features(self, track_id: str) -> str:
try:
return self.get(f'https://api.spotify.com/v1/audio-features/{track_id}').json()
except Exception as ex:
return ''
def get_category_playlist_ids(self, category_id: str, limit=50, offset=0) -> str:
playlist_ids = []
current_offset = offset
has_next = True
while len(playlist_ids) < limit and has_next:
category_playlists_json = self.get_category_playlists(category_id, limit=50, offset=current_offset)
has_next = category_playlists_json['playlists']['next'] is not None
for playlist in category_playlists_json['playlists']['items']:
playlist_ids.append(playlist['id'])
return playlist_ids
def get_category_playlists(self, category_id: str, limit:int=50, offset:int=0) -> str:
return self.get(f'https://api.spotify.com/v1/browse/categories/{category_id}/playlists/?limit={limit}&offset={offset}').json()
def get_categories(self, limit=50) -> str:
return self.get(f'https://api.spotify.com/v1/browse/categories/?limit={limit}').json()
def get_categories_ids(self, limit=50) -> str:
categories = self.get_categories()
ids = []
for category in categories['categories']['items']:
ids.append(category['id'])
return ids
def get_playlist(self, playlist_id: str) -> str:
playlist_data = self.get(f'https://api.spotify.com/v1/playlists/{playlist_id}').json()
tracks = self.scrape_playlist_tracks(playlist_id)
return SpotifyPlaylist(spotify_id=playlist_id, tracks=tracks, data=playlist_data)

337
spotify_utils.py 100644
Wyświetl plik

@ -0,0 +1,337 @@
from email.mime import audio
import base64
from config import *
class SpotifyAlbum:
title = ''
thumbnail_href = ''
track_count = 0
release_date = 0
spotify_id = ''
def __init__(self, album_data=None) -> None:
if album_data is not None:
self.load_from_data(album_data)
def load_from_data(self, data):
self.title = data['name']
self.thumbnail_href = data['images'][0]['url']
self.track_count = data['total_tracks']
try:
self.release_date = time.mktime(datetime.datetime.strptime(data['release_date'], "%Y-%m-%d").timetuple())
except:
try:
self.release_date = time.mktime(datetime.datetime.strptime(data['release_date'], "%Y-%m").timetuple())
except:
try:
self.release_date = time.mktime(datetime.datetime.strptime(data['release_date'], "%Y").timetuple())
except:
self.release_date = '0000-00-00'
self.spotify_id = data['id']
#self.href = data['href']
def __str__(self) -> str:
return f'SpotifyAlbum< {self.title} >'
def href(self) -> str:
return f'https://api.spotify.com/v1/albums/{self.spotify_id}'
class SpotifyArtist:
spotify_id = ''
name = ''
def __init__(self, artist_data: None) -> None:
if artist_data is not None:
self.load_from_data(artist_data)
def load_from_data(self, data):
self.spotify_id = data['id']
self.name = data['name']
def href(self) -> str:
return f'https://api.spotify.com/v1/artists/{self.spotify_id}'
def __str__(self) -> str:
return f'SpotifyArtist< {self.name} >'
def __repr__(self) -> str:
return self.__str__()
def download_image(self, scraper) -> bytes:
if scraper is None:
return b''
artist_images = scraper.get(self.href()).json()['images']
if len(artist_images) == 0:
raise Exception(f'Artist "{self.name}" has no image!')
image_response = requests.get(artist_images[0]['url'])
return image_response.content
class SpotifyTrack:
title = ''
spotify_id = ''
artists = []
album = None
thumbnail_href = ''
release_date = 0
disc_number = 0
duration_ms = 0
explicit = False
href = ''
popularity = 0
audio = b''
lyrics = ''
thumnail = b''
data_dump = ''
def __init__(self, track_data=None) -> None:
if track_data is not None:
self.load_from_data(track_data)
def load_from_data(self, data):
if 'track' in data:
data = data['track']
self.data_dump = data
self.album = SpotifyAlbum(data['album'])
self.title = data['name']
self.spotify_id = data['id']
self.artists = [SpotifyArtist(x) for x in data['artists']]
self.thumbnail_href = self.album.thumbnail_href
self.release_date = self.album.release_date
self.track_number = data['track_number']
self.duration_ms = data['duration_ms']
self.explicit = data['explicit']
self.href = data['href']
self.popularity = data['popularity']
self.isrc = data['external_ids']['isrc']
def __str__(self) -> str:
return f'SpotifyTrack< {self.title} >'
def __repr__(self) -> str:
return self.__str__()
def get_lyrics(self, scraper) -> str:
if scraper is None:
raise Exception('SCAPER NOT AVAILABLE!')
return scraper.get_lyrics(self.spotify_id)
def download_thumbnail(self, scraper) -> bytes:
return scraper.get(self.thumbnail_href).content
def get_download_link(self, scraper) -> str:
return get_track_download_url(get_deezer_track_data(get_deezer_track_id_from_isrc(self.isrc)))[0]
def decrypt_download_data(self, content: Response) -> bytes:
chunk_size = 2048
data_iter = content.iter_content(chunk_size)
i = 0
decrypted = b''
blowfish_key = get_blowfish_key(get_deezer_track_id_from_isrc(self.isrc))
for chunk in data_iter:
current_chunk_size = len(chunk)
if i % 3 > 0:
decrypted += chunk
elif len(chunk) < chunk_size:
decrypted += chunk
break
else:
cipher = Cipher(algorithms.Blowfish(blowfish_key),
modes.CBC(
bytes([i for i in range(8)])),
default_backend())
decryptor = cipher.decryptor()
dec_data = decryptor.update(
chunk) + decryptor.finalize()
decrypted += dec_data
current_chunk_size = len(dec_data)
i += 1
return decrypted
def download(self, scraper) -> bytes:
try:
download_link = self.get_download_link(scraper)
data = self.decrypt_download_data(requests.get(download_link, headers={'Accept':'*/*'}))
return data
except Exception as ex:
raise Exception(f'Failed to download {self.title} | Exception: {ex}')
def package_download(self, scraper):
self.audio = self.download(scraper)
self.thumbnail = self.download_thumbnail(scraper)
self.lyrics = self.get_lyrics(scraper)
def download_to_file(self, scraper, output_path: str):
temp_file_path = f'temp/{hashlib.sha1(self.title.encode() + self.album.spotify_id.encode()).hexdigest()}.temp.mp3'
self.package_download(scraper)
with open(temp_file_path, 'wb') as f:
f.write(self.audio)
audio_file = eyed3.load(temp_file_path)
audio_file.initTag(version=(2, 4, 0)) # version is important
audio_file.tag.title = self.title
audio_file.tag.artist = '/'.join([artist.name for artist in self.artists])
audio_file.tag.album_artist = '/'.join([artist.name for artist in self.artists])
audio_file.tag.album = self.album.title
audio_file.tag.original_release_date = datetime.datetime.fromtimestamp(self.album.release_date).year
audio_file.tag.track_num = self.track_number
audio_file.info.time_secs = self.duration_ms / 1000
audio_file.tag.images.set(3, self.thumbnail, 'image/jpeg', u'cover')
audio_file.tag.lyrics.set(str(self.lyrics))
audio_file.tag.comments.set('', str(self.data_dump))
audio_file.tag.save()
output_path = clean_file_path(output_path)
os.makedirs(os.path.dirname(output_path), exist_ok=True)
shutil.move(temp_file_path, output_path)
class SpotifyPlaylist:
spotify_id = ''
tracks = []
image_url = ''
title = ''
description = ''
def __init__(self, spotify_id, tracks:list[SpotifyTrack], data):
self.spotify_id = spotify_id
self.tracks = tracks
self.title = data['name']
self.description = data['description']
if len(data['images']) > 0:
self.image_url = data['images'][0]['url']
def export(self) -> str:
""" Returns a simple json object with the bare minimum playlist data """
image_data = requests.get(self.image_url).content
data = {
'title': self.title,
'description': self.description,
'spotify_id': self.spotify_id,
'image_url': self.image_url,
'image_b64': base64.b64encode(image_data).decode(),
'track_ids': [track.spotify_id for track in self.tracks]
}
return json.dumps(data)
@property
def href(self):
return f'https://open.spotify.com/playlist/{self.spotify_id}'
def get_deezer_track_id_from_isrc(isrc: str) -> str:
try:
cookies = {'dzr_uniq_id': 'dzr_uniq_id_frc3270536fa4e8fd6594415125daa7ba2096811'}
return str(requests.get(f'https://api.deezer.com/2.0/track/isrc:{isrc}').json()['id'])
except KeyError:
raise Exception(f'Could not find deezer track by isrc: {isrc}')
def get_deezer_track_data(song_id: str) -> dict:
cookies = {'dzr_uniq_id': 'dzr_uniq_id_frc3270536fa4e8fd6594415125daa7ba2096811', 'sid': 'fre82a0685d587f159cb7cf0a5f1e8f7aee759d2'}
resp = requests.post('https://www.deezer.com/ajax/gw-light.php?api_version=1.0&api_token=By7mRaeO.7.UDI6~NtRjcR1whWRStYb4&input=3&method=deezer.pageTrack', data='{"sng_id":"' + song_id +'"}', cookies=cookies)
track_json = resp.json()
data = {}
data['md5_origin'] = track_json['results']['DATA']['MD5_ORIGIN']
data['media_version'] = track_json['results']['DATA']['media_version'.upper()]
data['id'] = song_id
return data
def get_track_download_url(track, **kwargs):
"""Gets and decrypts the download url of the given track in the given quality
Arguments:
track {dict} -- Track dictionary, similar to the {info} value that is returned {using get_track()}
Keyword Arguments:
quality {str} -- Use values from {constants.track_formats}, will get the default quality if None or an invalid is given. (default: {None})
fallback {bool} -- Set to True to if you want to use fallback qualities when the given quality is not available. (default: {False})
renew {bool} -- Will renew the track object (default: {False})
Raises:
DownloadLinkDecryptionError: Will be raised if the track dictionary does not have an MD5
ValueError: Will be raised if valid track argument was given
Returns:
str -- Download url
"""
# Decryption algo got from: https://git.fuwafuwa.moe/toad/ayeBot/src/branch/master/bot.py;
# and https://notabug.org/deezpy-dev/Deezpy/src/master/deezpy.py
# Huge thanks!
quality = track_formats.FLAC
fallback = True
try:
if not "md5_origin" in track:
raise Exception(
"MD5 is needed to decrypt the download link.")
md5_origin = track["md5_origin"]
track_id = track["id"]
media_version = track["media_version"]
except ValueError:
raise ValueError(
"You have passed an invalid argument.")
def decrypt_url(quality_code):
magic_char = "¤"
step1 = magic_char.join((md5_origin,
str(quality_code),
track_id,
media_version))
m = hashlib.md5()
m.update(bytes([ord(x) for x in step1]))
step2 = m.hexdigest() + magic_char + step1 + magic_char
step2 = step2.ljust(80, " ")
cipher = Cipher(algorithms.AES(bytes('jo6aey6haid2Teih', 'ascii')),
modes.ECB(), default_backend())
encryptor = cipher.encryptor()
step3 = encryptor.update(bytes([ord(x) for x in step2])).hex()
cdn = track["md5_origin"][0]
return f'https://e-cdns-proxy-{cdn}.dzcdn.net/mobile/1/{step3}'
url = decrypt_url(track_formats.TRACK_FORMAT_MAP[quality]["code"])
res = requests.get(url, stream=True)
if not fallback or (res.status_code == 200 and int(res.headers["Content-length"]) > 0):
res.close()
return (url, quality)
else:
if "fallback_qualities" in kwargs:
fallback_qualities = kwargs["fallback_qualities"]
else:
fallback_qualities = track_formats.FALLBACK_QUALITIES
for key in fallback_qualities:
url = decrypt_url(
track_formats.TRACK_FORMAT_MAP[key]["code"])
res = requests.get(
url, stream=True)
if res.status_code == 200 and int(res.headers["Content-length"]) > 0:
res.close()
return (url, key)
def get_blowfish_key(track_id):
secret = 'g4el58wc0zvf9na1'
m = hashlib.md5()
m.update(bytes([ord(x) for x in track_id]))
id_md5 = m.hexdigest()
blowfish_key = bytes(([(ord(id_md5[i]) ^ ord(id_md5[i+16]) ^ ord(secret[i]))
for i in range(16)]))
return blowfish_key

11
spotpy.py 100644
Wyświetl plik

@ -0,0 +1,11 @@
import spotipy
from spotipy.oauth2 import SpotifyOAuth
scope = "user-library-read"
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(scope=scope))
results = sp.current_user_saved_tracks()
for idx, item in enumerate(results['items']):
track = item['track']
print(idx, track['artists'][0]['name'], "", track['name'])

141
static/css/base.css 100644
Wyświetl plik

@ -0,0 +1,141 @@
body {
width: 100vw;
height: 100vh;
overflow: hidden;
}
.dark-mode
{
background-image: linear-gradient(to bottom right, rgba( 50, 50, 50, 0.9 ), rgba(10, 10, 10, 0.9) );
background-color: rgba(32, 32, 32, 0.3);
color: rgba(220, 220, 220, 0.95);
}
.center-all
{
align-items: center;
text-align: center;
}
.align-bottom
{
position: absolute;
bottom: 0;
}
.scrollable::-webkit-scrollbar
{
width: 10px;
}
.scrollable::-webkit-scrollbar-track
{
background-color: rgba(240, 240, 240, 0.1);
border-radius: 2px;
border-style: solid;
border-color: rgba(250, 250, 250, 0.2);
}
.scrollable::-webkit-scrollbar-thumb
{
background-color: rgba(240, 240, 240, 0.5);
border-radius: 5px;
}
#console-output
{
box-sizing: border-box;
background-color: rgba(42, 42, 42, 0.3);
border-radius: 1vw;
text-align: left;
padding: 10px;
max-height: 50vh;
width: calc(100% - 10px);
overflow-x: hidden;
overflow-y: scroll;
}
#console-output p
{
border-bottom: solid 1px rgba(220, 220, 220, 0.95);
padding: 1px;
}
input
{
background-color: rgba(50, 50, 50, 0.7);
border-color: rgba(220, 220, 220, 0.6);
border-radius: 5px;
border-style: ridge;
color: inherit;
padding: 1px;
margin: 2px;
}
input[type="submit"] {
padding: 3px;
}
#download-form
{
padding: 10px;
}
.single-line {
width: 100%;
display: flex;
flex-direction: row;
flex-wrap: nowrap;
flex: 1;
justify-content: space-between;
align-items: stretch;
text-align: right;
margin: 3px;
}
.single-line label {
margin-right: 5px;
}
form input[type="number"] {
padding: 2px;
}
form input[type="checkbox"] {
-webkit-appearance: none;
appearance: none;
background-color: #fff;
margin: 3px;
margin-right: 5px;
font: inherit;
color: currentColor;
width: 1.15em;
height: 1.15em;
border: 0.15em solid currentColor;
border-radius: 0.15em;
transform: translateY(-0.075em);
display: grid;
place-content: center;
}
form input[type="checkbox"]::before {
content: "";
width: 0.65em;
height: 0.65em;
transform: scale(0);
transition: 120ms transform ease-in-out;
box-shadow: inset 1em 1em var(--form-control-color);
background-color: CanvasText;
transform-origin: center;
clip-path: polygon(14% 44%, 0 65%, 50% 100%, 100% 16%, 80% 0%, 43% 62%);
}
form input[type="checkbox"]:checked::before {
transform: scale(1);
}
form input[type="checkbox"]:hover {
outline: max(2px, 0.15em) solid currentColor;
outline-offset: max(2px, 0.15em);
}

Wyświetl plik

@ -0,0 +1,51 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="stylesheet" href="/static/css/base.css">
<title>Spotifile</title>
<script>
let offset = 0;
const interval = setInterval(async function() {
const response = await fetch(`/info/console/?offset=${offset}`);
const data = await response.json();
offset = data['offset'];
for (let i = 0; i < data['logs'].length; ++i)
{
let log_line = document.createElement('p');
log_line.innerHTML = `${data['logs'][i]['time']} | ${data['logs'][i]['value']}`;
log_line.style.color = data['logs'][i]['color'];
document.getElementById('console-output').appendChild(log_line);
}
document.getElementById('console-output').scrollTop = 999999999999999999999999;
}, 1000);
</script>
</head>
<body class="dark-mode center-all">
<div id="download-selector">
<form id="download-form" action="/actions/download/" method="POST" target="dummy-frame">
<input type="text" id="flink" name="flink" placeholder="https://open.spotify.com/..." style="width:30em;">
<input type="submit" value="Download"/><br>
<div style="display:flex;justify-content:center;"><div style="display:flex;flex-direction:column;width:fit-content;">
<div class="single-line"><label>Recursive: </label><input type="checkbox" name="recursive"></div>
<div class="single-line"><label>Recusive Album: </label><input type="checkbox" name="recursive-album"></div>
<div class="single-line"><label>Recusive Artist: </label><input type="checkbox" name="recursive-artist"></div>
<div class="single-line"><label>Recursive Limit: </label><input type="number" name="recursive-limit" value="1024"></div>
<div class="single-line"><label>Thread Limit: </label><input type="number" name="thread-count" value="5"></div>
</div></div>
</form>
</div>
<div id="console-output" class="align-bottom scrollable">
</div>
<iframe id="dummy-frame" name="dummy-frame" style="display:none;"></iframe>
</body>
</html>

49
webgui.py 100644
Wyświetl plik

@ -0,0 +1,49 @@
from config import *
from pydoc import render_doc
from flask import Flask, render_template, request, jsonify
from spotify_mass_download import full_download, console, download_all_categories_playlists
app = Flask(__name__)
app.config['TEMPLATES_AUTO_RELOAD'] = True
@app.route('/')
def index():
return render_template('index.html')
@app.route('/actions/download/', methods=['POST'])
def actions_download():
try:
spotify_url = request.form.get('flink')
recursive = request.form.get('recursive') or False
recursive_artist = request.form.get('recursive-artist') or False
recursive_album = request.form.get('recursive-album') or False
recursive_limit = min(int(request.form.get('recursive-limit')) or 1024, FULL_DOWNLOAD_RECURISVE_LIMIT)
thread_count = min(int(request.form.get('thread-count')) or 5, FULL_DOWNLOAD_THREAD_LIMIT)
recursive = True if recursive == 'on' else False
recursive_album = True if recursive_album == 'on' else False
recursive_artist = True if recursive_artist == 'on' else False
full_download(DEFAULT_DOWNLOAD_DIRECTORY, spotify_url, recursive=recursive, recursive_artist=recursive_artist, recursive_album=recursive_album, recursive_limit=recursive_limit, thread_count=thread_count)
return 'success'
except Exception as ex:
return str(ex)
@app.route('/actions/download/categories')
def actions_download_categories():
download_all_categories_playlists()
@app.route('/info/console/')
def info_console():
offset = request.args.get('offset')
if offset == 'undefined':
offset = 0
offset = int(offset)
logs = console.get()
return jsonify( {'logs': logs[offset:], 'offset': len(logs)} )
if __name__ == '__main__':
app.run(debug=True)