From e8bbed1b3582acea8ffb721c8474956e735a7314 Mon Sep 17 00:00:00 2001 From: Michael Kuperfish Steinberg <36902556+Michael-K-Stein@users.noreply.github.com> Date: Fri, 20 Jan 2023 14:11:45 +0200 Subject: [PATCH] Refactor util classes & functions --- config.py | 3 - exceptions.py | 4 + spotify_mass_download.py | 1 + spotify_scraper.py | 9 +- spotify_utils.py | 379 -------------------------------------- utils/deezer_utils.py | 147 +++++++++++++++ utils/spotify_album.py | 36 ++++ utils/spotify_artist.py | 33 ++++ utils/spotify_category.py | 38 ++++ utils/spotify_playlist.py | 44 +++++ utils/spotify_track.py | 109 +++++++++++ utils/utils.py | 4 + 12 files changed, 421 insertions(+), 386 deletions(-) delete mode 100644 spotify_utils.py create mode 100644 utils/deezer_utils.py create mode 100644 utils/spotify_album.py create mode 100644 utils/spotify_artist.py create mode 100644 utils/spotify_category.py create mode 100644 utils/spotify_playlist.py create mode 100644 utils/spotify_track.py create mode 100644 utils/utils.py diff --git a/config.py b/config.py index dba8f49..e4fa2d0 100644 --- a/config.py +++ b/config.py @@ -22,6 +22,3 @@ PROXY = {} VERIFY_SSL = True settings = Settings() - -def clean_file_path(prompt: str): - return prompt.replace('/', '').replace('?', '').replace('"', '').replace('*', '').replace('|', '').replace('\\', '').replace(':', '').replace(';', '').replace('>', '').replace('<', '') diff --git a/exceptions.py b/exceptions.py index 28a45fe..1847221 100644 --- a/exceptions.py +++ b/exceptions.py @@ -14,3 +14,7 @@ class SpotifyTrackException(SpotiFileException): class SpotifyArtistException(SpotiFileException): pass + + +class DeezerException(SpotiFileException): + pass diff --git a/spotify_mass_download.py b/spotify_mass_download.py index fdf22b7..d4f3f06 100644 --- a/spotify_mass_download.py +++ b/spotify_mass_download.py @@ -7,6 +7,7 @@ import base64 from time import sleep from datetime import datetime import random +from utils.utils import clean_file_path client = SpotifyClient(sp_key=SP_KEY, sp_dc=SP_DC) client.get_me() diff --git a/spotify_scraper.py b/spotify_scraper.py index 53b7897..771eff0 100644 --- a/spotify_scraper.py +++ b/spotify_scraper.py @@ -1,8 +1,9 @@ -from concurrent.futures import process -from config import * -from spotify_utils import * -from spotify_client import SpotifyClient from enum import Enum +from config import * +from utils.spotify_track import SpotifyTrack +from utils.spotify_playlist import SpotifyPlaylist +from utils.spotify_category import SpotifyCategory +from spotify_client import SpotifyClient class SpotifyScraper: diff --git a/spotify_utils.py b/spotify_utils.py deleted file mode 100644 index 686e061..0000000 --- a/spotify_utils.py +++ /dev/null @@ -1,379 +0,0 @@ -from email.mime import audio -import base64 -from config import * -from exceptions import SpotifyTrackException, SpotifyArtistException - - -class SpotifyCategory: - name = '' - spotify_id = '' - playlist_ids = '' - thumbnail_href = '' - - def __init__(self, category_data=None): - self.name = category_data['name'] - self.spotify_id = category_data['id'] - if len(category_data['icons']) > 0: - self.thumbnail_href = category_data['icons'][0]['url'] - - def download_metadata(self, scraper): - - thumbail_b64 = '' - if self.thumbnail_href: - thumbail_b64 = base64.b64encode( requests.get(self.thumbnail_href).content ).decode() - - try: - self.playlist_ids = scraper.get_category_playlist_ids(category_id=self.spotify_id) - except: - self.playlist_ids = [] - - data = { - 'name': self.name, - 'spotify_id': self.spotify_id, - 'thumbnail_b64': thumbail_b64, - 'playlist_ids': self.playlist_ids, - } - - with open(f'{settings.DEFAULT_DOWNLOAD_DIRECTORY}/{settings.CATEGORY_METADATA_SUB_DIR}/{self.spotify_id}.category', 'w') as f: - f.write(json.dumps(data)) - - -class SpotifyAlbum: - title = '' - thumbnail_href = '' - track_count = 0 - release_date = 0 - spotify_id = '' - - def __init__(self, album_data=None) -> None: - if album_data is not None: - self.load_from_data(album_data) - - def load_from_data(self, data): - self.title = data['name'] - self.thumbnail_href = data['images'][0]['url'] - self.track_count = data['total_tracks'] - try: - self.release_date = time.mktime(datetime.datetime.strptime(data['release_date'], "%Y-%m-%d").timetuple()) - except: - try: - self.release_date = time.mktime(datetime.datetime.strptime(data['release_date'], "%Y-%m").timetuple()) - except: - try: - self.release_date = time.mktime(datetime.datetime.strptime(data['release_date'], "%Y").timetuple()) - except: - self.release_date = '0000-00-00' - self.spotify_id = data['id'] - - def __str__(self) -> str: - return f'SpotifyAlbum< {self.title} >' - - def href(self) -> str: - return f'https://api.spotify.com/v1/albums/{self.spotify_id}' - - -class SpotifyArtist: - spotify_id = '' - name = '' - - def __init__(self, artist_data: None) -> None: - if artist_data is not None: - self.load_from_data(artist_data) - - def load_from_data(self, data): - self.spotify_id = data['id'] - self.name = data['name'] - - def href(self) -> str: - return f'https://api.spotify.com/v1/artists/{self.spotify_id}' - - def __str__(self) -> str: - return f'SpotifyArtist< {self.name} >' - - def __repr__(self) -> str: - return self.__str__() - - def download_image(self, scraper) -> bytes: - if scraper is None: - return b'' - artist_images = scraper.get(self.href()).json()['images'] - if len(artist_images) == 0: - raise SpotifyArtistException(f'Artist "{self.name}" has no image!') - image_response = requests.get(artist_images[0]['url']) - return image_response.content - - -class SpotifyTrack: - title = '' - spotify_id = '' - artists = [] - album = None - thumbnail_href = '' - release_date = 0 - disc_number = 0 - duration_ms = 0 - explicit = False - href = '' - popularity = 0 - audio = b'' - lyrics = '' - thumnail = b'' - data_dump = '' - - def __init__(self, track_data=None) -> None: - if track_data is not None: - self.load_from_data(track_data) - - def load_from_data(self, data): - if 'track' in data: - data = data['track'] - self.data_dump = data - self.album = SpotifyAlbum(data['album']) - self.title = data['name'] - self.spotify_id = data['id'] - self.artists = [SpotifyArtist(x) for x in data['artists']] - self.thumbnail_href = self.album.thumbnail_href - self.release_date = self.album.release_date - self.track_number = data['track_number'] - self.duration_ms = data['duration_ms'] - self.explicit = data['explicit'] - self.href = data['href'] - self.popularity = data['popularity'] - self.isrc = data['external_ids']['isrc'] - - def __str__(self) -> str: - return f'SpotifyTrack< {self.title} >' - - def __repr__(self) -> str: - return self.__str__() - - def get_lyrics(self, scraper) -> str: - if scraper is None: - raise SpotifyTrackException('SCAPER NOT AVAILABLE!') - return scraper.get_lyrics(self.spotify_id) - - def download_thumbnail(self, scraper) -> bytes: - return scraper.get(self.thumbnail_href).content - - def get_download_link(self, scraper) -> str: - return get_track_download_url(get_deezer_track_data(get_deezer_track_id_from_isrc(self.isrc)))[0] - - def decrypt_download_data(self, content: Response) -> bytes: - chunk_size = 2048 - data_iter = content.iter_content(chunk_size) - i = 0 - decrypted = b'' - blowfish_key = get_blowfish_key(get_deezer_track_id_from_isrc(self.isrc)) - for chunk in data_iter: - current_chunk_size = len(chunk) - - if i % 3 > 0: - decrypted += chunk - elif len(chunk) < chunk_size: - decrypted += chunk - break - else: - cipher = Cipher(algorithms.Blowfish(blowfish_key), - modes.CBC( - bytes([i for i in range(8)])), - default_backend()) - - decryptor = cipher.decryptor() - dec_data = decryptor.update( - chunk) + decryptor.finalize() - decrypted += dec_data - - current_chunk_size = len(dec_data) - - i += 1 - return decrypted - - def download(self, scraper) -> bytes: - try: - download_link = self.get_download_link(scraper) - data = self.decrypt_download_data(requests.get(download_link, headers={'Accept':'*/*'})) - return data - except Exception as ex: - raise SpotifyTrackException(f'Failed to download {self.title} | Exception: {ex}') - - def package_download(self, scraper): - self.audio = self.download(scraper) - self.thumbnail = self.download_thumbnail(scraper) - self.lyrics = self.get_lyrics(scraper) - - def preview_title(self): - return f'{", ".join([x.name for x in self.artists])} - {self.title} [{self.album.title}]' - - def download_to_file(self, scraper, output_path: str): - temp_file_path = f'temp/{hashlib.sha1(self.title.encode() + self.album.spotify_id.encode()).hexdigest()}.temp.mp3' - self.package_download(scraper) - with open(temp_file_path, 'wb') as f: - f.write(self.audio) - - audio_file = eyed3.load(temp_file_path) - audio_file.initTag(version=(2, 4, 0)) # version is important - audio_file.tag.title = self.title - audio_file.tag.artist = '/'.join([artist.name for artist in self.artists]) - audio_file.tag.album_artist = '/'.join([artist.name for artist in self.artists]) - audio_file.tag.album = self.album.title - audio_file.tag.original_release_date = datetime.datetime.fromtimestamp(self.album.release_date).year - audio_file.tag.track_num = self.track_number - audio_file.info.time_secs = self.duration_ms / 1000 - audio_file.tag.images.set(3, self.thumbnail, 'image/jpeg', u'cover') - audio_file.tag.lyrics.set(str(self.lyrics)) - audio_file.tag.comments.set('', str(self.data_dump)) - - audio_file.tag.save() - - full_output_path = output_path + '/' + clean_file_path(self.preview_title()) + '.mp3' - os.makedirs(os.path.dirname(full_output_path), exist_ok=True) - shutil.move(temp_file_path, full_output_path) - - -class SpotifyPlaylist: - spotify_id = '' - tracks = [] - image_url = '' - title = '' - description = '' - - def __init__(self, spotify_id, tracks:list[SpotifyTrack], data): - self.spotify_id = spotify_id - self.tracks = tracks - self.title = data['name'] - self.description = data['description'] - if len(data['images']) > 0: - self.image_url = data['images'][0]['url'] - - def export(self) -> str: - """ Returns a simple json object with the bare minimum playlist data """ - image_data = requests.get(self.image_url).content - data = { - 'title': self.title, - 'description': self.description, - 'spotify_id': self.spotify_id, - 'image_url': self.image_url, - 'image_b64': base64.b64encode(image_data).decode(), - 'track_ids': [track.spotify_id for track in self.tracks] - } - return json.dumps(data) - - def export_to_file(self) -> None: - os.makedirs(f'{settings.DEFAULT_DOWNLOAD_DIRECTORY}/{settings.PLAYLIST_METADATA_SUB_DIR}/', exist_ok=True) - with open(f'{settings.DEFAULT_DOWNLOAD_DIRECTORY}/{settings.PLAYLIST_METADATA_SUB_DIR}/{self.spotify_id}.playlist', 'w') as f: - f.write(self.export()) - - @property - def href(self): - return f'https://open.spotify.com/playlist/{self.spotify_id}' - -def get_deezer_track_id_from_isrc(isrc: str) -> str: - try: - cookies = {'dzr_uniq_id': 'dzr_uniq_id_frc3270536fa4e8fd6594415125daa7ba2096811'} - return str(requests.get(f'https://api.deezer.com/2.0/track/isrc:{isrc}').json()['id']) - except KeyError: - raise Exception(f'Could not find deezer track by isrc: {isrc}') - - -def get_deezer_track_data(song_id: str) -> dict: - cookies = {'dzr_uniq_id': 'dzr_uniq_id_frc3270536fa4e8fd6594415125daa7ba2096811', 'sid': 'fre82a0685d587f159cb7cf0a5f1e8f7aee759d2'} - resp = requests.post('https://www.deezer.com/ajax/gw-light.php?api_version=1.0&api_token=By7mRaeO.7.UDI6~NtRjcR1whWRStYb4&input=3&method=deezer.pageTrack', data='{"sng_id":"' + song_id +'"}', cookies=cookies) - track_json = resp.json() - data = {} - data['md5_origin'] = track_json['results']['DATA']['MD5_ORIGIN'] - data['media_version'] = track_json['results']['DATA']['media_version'.upper()] - data['id'] = song_id - return data - - -def get_track_download_url(track, **kwargs): - """Gets and decrypts the download url of the given track in the given quality - Arguments: - track {dict} -- Track dictionary, similar to the {info} value that is returned {using get_track()} - Keyword Arguments: - quality {str} -- Use values from {constants.track_formats}, will get the default quality if None or an invalid is given. (default: {None}) - fallback {bool} -- Set to True to if you want to use fallback qualities when the given quality is not available. (default: {False}) - renew {bool} -- Will renew the track object (default: {False}) - Raises: - DownloadLinkDecryptionError: Will be raised if the track dictionary does not have an MD5 - ValueError: Will be raised if valid track argument was given - Returns: - str -- Download url - """ - - # Decryption algo got from: https://git.fuwafuwa.moe/toad/ayeBot/src/branch/master/bot.py; - # and https://notabug.org/deezpy-dev/Deezpy/src/master/deezpy.py - # Huge thanks! - - quality = track_formats.FLAC - fallback = True - - try: - if not "md5_origin" in track: - raise Exception( - "MD5 is needed to decrypt the download link.") - - md5_origin = track["md5_origin"] - track_id = track["id"] - media_version = track["media_version"] - except ValueError: - raise ValueError( - "You have passed an invalid argument.") - - def decrypt_url(quality_code): - magic_char = "¤" - step1 = magic_char.join((md5_origin, - str(quality_code), - track_id, - media_version)) - m = hashlib.md5() - m.update(bytes([ord(x) for x in step1])) - - step2 = m.hexdigest() + magic_char + step1 + magic_char - step2 = step2.ljust(80, " ") - - cipher = Cipher(algorithms.AES(bytes('jo6aey6haid2Teih', 'ascii')), - modes.ECB(), default_backend()) - - encryptor = cipher.encryptor() - step3 = encryptor.update(bytes([ord(x) for x in step2])).hex() - - cdn = track["md5_origin"][0] - - return f'https://e-cdns-proxy-{cdn}.dzcdn.net/mobile/1/{step3}' - - url = decrypt_url(track_formats.TRACK_FORMAT_MAP[quality]["code"]) - res = requests.get(url, stream=True) - - if not fallback or (res.status_code == 200 and int(res.headers["Content-length"]) > 0): - res.close() - return (url, quality) - else: - if "fallback_qualities" in kwargs: - fallback_qualities = kwargs["fallback_qualities"] - else: - fallback_qualities = track_formats.FALLBACK_QUALITIES - - for key in fallback_qualities: - url = decrypt_url( - track_formats.TRACK_FORMAT_MAP[key]["code"]) - - res = requests.get( - url, stream=True) - - if res.status_code == 200 and int(res.headers["Content-length"]) > 0: - res.close() - return (url, key) - - -def get_blowfish_key(track_id): - secret = 'g4el58wc0zvf9na1' - - m = hashlib.md5() - m.update(bytes([ord(x) for x in track_id])) - id_md5 = m.hexdigest() - - blowfish_key = bytes(([(ord(id_md5[i]) ^ ord(id_md5[i+16]) ^ ord(secret[i])) - for i in range(16)])) - - return blowfish_key diff --git a/utils/deezer_utils.py b/utils/deezer_utils.py new file mode 100644 index 0000000..417d513 --- /dev/null +++ b/utils/deezer_utils.py @@ -0,0 +1,147 @@ +from config import * +from exceptions import DeezerException + + +class Deezer: + _cookies = {'dzr_uniq_id': 'dzr_uniq_id_frc3270536fa4e8fd6594415125daa7ba2096811', 'sid': 'fre82a0685d587f159cb7cf0a5f1e8f7aee759d2'} + + @staticmethod + def get_track_id_from_isrc(isrc: str) -> str: + try: + return str(requests.get(f'https://api.deezer.com/2.0/track/isrc:{isrc}').json()['id']) + except KeyError: + raise DeezerException(f'Could not find deezer track by isrc: {isrc}') + + @staticmethod + def get_track_data(song_id: str) -> dict: + resp = requests.post('https://www.deezer.com/ajax/gw-light.php?api_version=1.0&api_token=By7mRaeO.7.UDI6~NtRjcR1whWRStYb4&input=3&method=deezer.pageTrack', data='{"sng_id":"' + song_id +'"}', cookies=Deezer._cookies) + track_json = resp.json() + data = {} + data['md5_origin'] = track_json['results']['DATA']['MD5_ORIGIN'] + data['media_version'] = track_json['results']['DATA']['media_version'.upper()] + data['id'] = song_id + return data + + @staticmethod + def get_track_download_url(track, **kwargs): + """Gets and decrypts the download url of the given track in the given quality + Arguments: + track {dict} -- Track dictionary, similar to the {info} value that is returned {using get_track()} + Keyword Arguments: + quality {str} -- Use values from {constants.track_formats}, will get the default quality if None or an invalid is given. (default: {None}) + fallback {bool} -- Set to True to if you want to use fallback qualities when the given quality is not available. (default: {False}) + renew {bool} -- Will renew the track object (default: {False}) + Raises: + DownloadLinkDecryptionError: Will be raised if the track dictionary does not have an MD5 + ValueError: Will be raised if valid track argument was given + Returns: + str -- Download url + """ + + # Decryption algo got from: https://git.fuwafuwa.moe/toad/ayeBot/src/branch/master/bot.py; + # and https://notabug.org/deezpy-dev/Deezpy/src/master/deezpy.py + # Huge thanks! + + quality = track_formats.FLAC + fallback = True + + try: + if not "md5_origin" in track: + raise Exception( + "MD5 is needed to decrypt the download link.") + + md5_origin = track["md5_origin"] + track_id = track["id"] + media_version = track["media_version"] + except ValueError: + raise ValueError( + "You have passed an invalid argument.") + + def decrypt_url(quality_code): + magic_char = "¤" + step1 = magic_char.join((md5_origin, + str(quality_code), + track_id, + media_version)) + m = hashlib.md5() + m.update(bytes([ord(x) for x in step1])) + + step2 = m.hexdigest() + magic_char + step1 + magic_char + step2 = step2.ljust(80, " ") + + cipher = Cipher(algorithms.AES(bytes('jo6aey6haid2Teih', 'ascii')), + modes.ECB(), default_backend()) + + encryptor = cipher.encryptor() + step3 = encryptor.update(bytes([ord(x) for x in step2])).hex() + + cdn = track["md5_origin"][0] + + return f'https://e-cdns-proxy-{cdn}.dzcdn.net/mobile/1/{step3}' + + url = decrypt_url(track_formats.TRACK_FORMAT_MAP[quality]["code"]) + res = requests.get(url, stream=True) + + if not fallback or (res.status_code == 200 and int(res.headers["Content-length"]) > 0): + res.close() + return (url, quality) + else: + if "fallback_qualities" in kwargs: + fallback_qualities = kwargs["fallback_qualities"] + else: + fallback_qualities = track_formats.FALLBACK_QUALITIES + + for key in fallback_qualities: + url = decrypt_url( + track_formats.TRACK_FORMAT_MAP[key]["code"]) + + res = requests.get( + url, stream=True) + + if res.status_code == 200 and int(res.headers["Content-length"]) > 0: + res.close() + return (url, key) + + @staticmethod + def get_blowfish_key(track_id): + secret = 'g4el58wc0zvf9na1' + + m = hashlib.md5() + m.update(bytes([ord(x) for x in track_id])) + id_md5 = m.hexdigest() + + blowfish_key = bytes(([(ord(id_md5[i]) ^ ord(id_md5[i+16]) ^ ord(secret[i])) + for i in range(16)])) + + return blowfish_key + + @staticmethod + def decrypt_download_data(content: Response, isrc: str) -> bytes: + chunk_size = 2048 + data_iter = content.iter_content(chunk_size) + i = 0 + decrypted = b'' + blowfish_key = Deezer.get_blowfish_key(Deezer.get_track_id_from_isrc(isrc)) + for chunk in data_iter: + current_chunk_size = len(chunk) + + if i % 3 > 0: + decrypted += chunk + elif len(chunk) < chunk_size: + decrypted += chunk + break + else: + cipher = Cipher(algorithms.Blowfish(blowfish_key), + modes.CBC( + bytes([i for i in range(8)])), + default_backend()) + + decryptor = cipher.decryptor() + dec_data = decryptor.update( + chunk) + decryptor.finalize() + decrypted += dec_data + + current_chunk_size = len(dec_data) + + i += 1 + return decrypted diff --git a/utils/spotify_album.py b/utils/spotify_album.py new file mode 100644 index 0000000..47ba26d --- /dev/null +++ b/utils/spotify_album.py @@ -0,0 +1,36 @@ +import datetime +import time + + +class SpotifyAlbum: + title = '' + thumbnail_href = '' + track_count = 0 + release_date = 0 + spotify_id = '' + + def __init__(self, album_data=None) -> None: + if album_data is not None: + self.load_from_data(album_data) + + def load_from_data(self, data): + self.title = data['name'] + self.thumbnail_href = data['images'][0]['url'] + self.track_count = data['total_tracks'] + try: + self.release_date = time.mktime(datetime.datetime.strptime(data['release_date'], "%Y-%m-%d").timetuple()) + except: + try: + self.release_date = time.mktime(datetime.datetime.strptime(data['release_date'], "%Y-%m").timetuple()) + except: + try: + self.release_date = time.mktime(datetime.datetime.strptime(data['release_date'], "%Y").timetuple()) + except: + self.release_date = '0000-00-00' + self.spotify_id = data['id'] + + def __str__(self) -> str: + return f'SpotifyAlbum< {self.title} >' + + def href(self) -> str: + return f'https://api.spotify.com/v1/albums/{self.spotify_id}' diff --git a/utils/spotify_artist.py b/utils/spotify_artist.py new file mode 100644 index 0000000..619d0b7 --- /dev/null +++ b/utils/spotify_artist.py @@ -0,0 +1,33 @@ +import requests +from exceptions import SpotifyArtistException + + +class SpotifyArtist: + spotify_id = '' + name = '' + + def __init__(self, artist_data: None) -> None: + if artist_data is not None: + self.load_from_data(artist_data) + + def load_from_data(self, data): + self.spotify_id = data['id'] + self.name = data['name'] + + def href(self) -> str: + return f'https://api.spotify.com/v1/artists/{self.spotify_id}' + + def __str__(self) -> str: + return f'SpotifyArtist< {self.name} >' + + def __repr__(self) -> str: + return self.__str__() + + def download_image(self, scraper) -> bytes: + if scraper is None: + return b'' + artist_images = scraper.get(self.href()).json()['images'] + if len(artist_images) == 0: + raise SpotifyArtistException(f'Artist "{self.name}" has no image!') + image_response = requests.get(artist_images[0]['url']) + return image_response.content diff --git a/utils/spotify_category.py b/utils/spotify_category.py new file mode 100644 index 0000000..3918beb --- /dev/null +++ b/utils/spotify_category.py @@ -0,0 +1,38 @@ +import requests +import base64 +import json +from config import settings + + +class SpotifyCategory: + name = '' + spotify_id = '' + playlist_ids = '' + thumbnail_href = '' + + def __init__(self, category_data=None): + self.name = category_data['name'] + self.spotify_id = category_data['id'] + if len(category_data['icons']) > 0: + self.thumbnail_href = category_data['icons'][0]['url'] + + def download_metadata(self, scraper): + + thumbail_b64 = '' + if self.thumbnail_href: + thumbail_b64 = base64.b64encode( requests.get(self.thumbnail_href).content ).decode() + + try: + self.playlist_ids = scraper.get_category_playlist_ids(category_id=self.spotify_id) + except: + self.playlist_ids = [] + + data = { + 'name': self.name, + 'spotify_id': self.spotify_id, + 'thumbnail_b64': thumbail_b64, + 'playlist_ids': self.playlist_ids, + } + + with open(f'{settings.DEFAULT_DOWNLOAD_DIRECTORY}/{settings.CATEGORY_METADATA_SUB_DIR}/{self.spotify_id}.category', 'w') as f: + f.write(json.dumps(data)) diff --git a/utils/spotify_playlist.py b/utils/spotify_playlist.py new file mode 100644 index 0000000..11eb9ec --- /dev/null +++ b/utils/spotify_playlist.py @@ -0,0 +1,44 @@ +import base64 +import json +import requests +import os +from config import settings +from utils.spotify_track import SpotifyTrack + + +class SpotifyPlaylist: + spotify_id = '' + tracks = [] + image_url = '' + title = '' + description = '' + + def __init__(self, spotify_id, tracks:list[SpotifyTrack], data): + self.spotify_id = spotify_id + self.tracks = tracks + self.title = data['name'] + self.description = data['description'] + if len(data['images']) > 0: + self.image_url = data['images'][0]['url'] + + def export(self) -> str: + """ Returns a simple json object with the bare minimum playlist data """ + image_data = requests.get(self.image_url).content + data = { + 'title': self.title, + 'description': self.description, + 'spotify_id': self.spotify_id, + 'image_url': self.image_url, + 'image_b64': base64.b64encode(image_data).decode(), + 'track_ids': [track.spotify_id for track in self.tracks] + } + return json.dumps(data) + + def export_to_file(self) -> None: + os.makedirs(f'{settings.DEFAULT_DOWNLOAD_DIRECTORY}/{settings.PLAYLIST_METADATA_SUB_DIR}/', exist_ok=True) + with open(f'{settings.DEFAULT_DOWNLOAD_DIRECTORY}/{settings.PLAYLIST_METADATA_SUB_DIR}/{self.spotify_id}.playlist', 'w') as f: + f.write(self.export()) + + @property + def href(self): + return f'https://open.spotify.com/playlist/{self.spotify_id}' diff --git a/utils/spotify_track.py b/utils/spotify_track.py new file mode 100644 index 0000000..a117467 --- /dev/null +++ b/utils/spotify_track.py @@ -0,0 +1,109 @@ +import eyed3 +import requests +from requests import Response +import hashlib +import datetime +import os +import shutil +from utils.spotify_album import SpotifyAlbum +from utils.spotify_artist import SpotifyArtist +from utils.deezer_utils import Deezer +from utils.utils import clean_file_path +from exceptions import SpotifyTrackException + + +class SpotifyTrack: + title = '' + spotify_id = '' + artists = [] + album = None + thumbnail_href = '' + release_date = 0 + disc_number = 0 + duration_ms = 0 + explicit = False + href = '' + popularity = 0 + audio = b'' + lyrics = '' + thumnail = b'' + data_dump = '' + + def __init__(self, track_data=None) -> None: + if track_data is not None: + self.load_from_data(track_data) + + def load_from_data(self, data): + if 'track' in data: + data = data['track'] + self.data_dump = data + self.album = SpotifyAlbum(data['album']) + self.title = data['name'] + self.spotify_id = data['id'] + self.artists = [SpotifyArtist(x) for x in data['artists']] + self.thumbnail_href = self.album.thumbnail_href + self.release_date = self.album.release_date + self.track_number = data['track_number'] + self.duration_ms = data['duration_ms'] + self.explicit = data['explicit'] + self.href = data['href'] + self.popularity = data['popularity'] + self.isrc = data['external_ids']['isrc'] + + def __str__(self) -> str: + return f'SpotifyTrack< {self.title} >' + + def __repr__(self) -> str: + return self.__str__() + + def get_lyrics(self, scraper) -> str: + if scraper is None: + raise SpotifyTrackException('SCAPER NOT AVAILABLE!') + return scraper.get_lyrics(self.spotify_id) + + def download_thumbnail(self, scraper) -> bytes: + return scraper.get(self.thumbnail_href).content + + def get_download_link(self, scraper) -> str: + return Deezer.get_track_download_url(Deezer.get_track_data(Deezer.get_track_id_from_isrc(self.isrc)))[0] + + def download(self, scraper) -> bytes: + try: + download_link = self.get_download_link(scraper) + data = Deezer.decrypt_download_data(requests.get(download_link, headers={'Accept':'*/*'}), self.isrc) + return data + except Exception as ex: + raise SpotifyTrackException(f'Failed to download {self.title} | Exception: {ex}') + + def package_download(self, scraper): + self.audio = self.download(scraper) + self.thumbnail = self.download_thumbnail(scraper) + self.lyrics = self.get_lyrics(scraper) + + def preview_title(self): + return f'{", ".join([x.name for x in self.artists])} - {self.title} [{self.album.title}]' + + def download_to_file(self, scraper, output_path: str): + temp_file_path = f'temp/{hashlib.sha1(self.title.encode() + self.album.spotify_id.encode()).hexdigest()}.temp.mp3' + self.package_download(scraper) + with open(temp_file_path, 'wb') as f: + f.write(self.audio) + + audio_file = eyed3.load(temp_file_path) + audio_file.initTag(version=(2, 4, 0)) # version is important + audio_file.tag.title = self.title + audio_file.tag.artist = '/'.join([artist.name for artist in self.artists]) + audio_file.tag.album_artist = '/'.join([artist.name for artist in self.artists]) + audio_file.tag.album = self.album.title + audio_file.tag.original_release_date = datetime.datetime.fromtimestamp(self.album.release_date).year + audio_file.tag.track_num = self.track_number + audio_file.info.time_secs = self.duration_ms / 1000 + audio_file.tag.images.set(3, self.thumbnail, 'image/jpeg', u'cover') + audio_file.tag.lyrics.set(str(self.lyrics)) + audio_file.tag.comments.set('', str(self.data_dump)) + + audio_file.tag.save() + + full_output_path = output_path + '/' + clean_file_path(self.preview_title()) + '.mp3' + os.makedirs(os.path.dirname(full_output_path), exist_ok=True) + shutil.move(temp_file_path, full_output_path) diff --git a/utils/utils.py b/utils/utils.py new file mode 100644 index 0000000..13e5cb4 --- /dev/null +++ b/utils/utils.py @@ -0,0 +1,4 @@ + + +def clean_file_path(prompt: str): + return prompt.replace('/', '').replace('?', '').replace('"', '').replace('*', '').replace('|', '').replace('\\', '').replace(':', '').replace(';', '').replace('>', '').replace('<', '')