diff --git a/zspotify.py b/zspotify.py index bcae95a2..b54a351f 100644 --- a/zspotify.py +++ b/zspotify.py @@ -1,51 +1,57 @@ +""" +ZSpotify +It's like youtube-dl, but for Spotify. +""" + +import json import os import os.path import platform - -import unicodedata import re - -import subprocess -import time import sys - -import requests - -import json +import time import music_tag - +import requests from librespot.audio.decoders import AudioQuality, VorbisOnlyAudioQuality from librespot.core import Session from librespot.metadata import TrackId - from pydub import AudioSegment -session: Session = None +SESSION: Session = None -import hashlib +ROOT_PATH = "ZSpotify Music/" +SKIP_EXISTING_FILES = True +MUSIC_FORMAT = "mp3" # or "ogg" +sanitize = ["\\", "/", ":", "*", "?", "'", "<", ">", '"'] + +# miscellaneous functions for general use -rootPath = "ZSpotify Music/" -skipExistingFiles = True -musicFormat = "mp3" #or "ogg" - -#miscellaneous functions for general use def clear(): + """ Clear the console window """ if platform.system() == "Windows": os.system("cls") else: os.system("clear") + def wait(seconds: int = 3): + """ Pause for a set number of seconds """ for i in range(seconds)[::-1]: print("\rWait for %d second(s)..." % (i + 1), end="") time.sleep(1) -def sanitizeData(value): - return value.replace("\\", "").replace("/", "").replace(":", "").replace("*", "").replace("?", "").replace("'", "").replace("<", "").replace(">", "").replace('"',"").replace("|","-") + +def sanitize_data(value): + """ Returns given string with problematic removed """ + for i in sanitize: + sanitized_value = value.replace(i, "") + return sanitized_value.replace("|", "-") + def splash(): + """ Displays splash screen """ print("=================================\n" "| Spotify Downloader |\n" "| |\n" @@ -53,14 +59,14 @@ def splash(): "=================================\n\n\n") - -#two mains functions for logging in and doing client stuff +# two mains functions for logging in and doing client stuff def login(): - global session + """ Authenticates with Spotify and saves credentials to a file """ + global SESSION if os.path.isfile("credentials.json"): try: - session = Session.Builder().stored_file().create() + SESSION = Session.Builder().stored_file().create() return except RuntimeError: pass @@ -68,31 +74,32 @@ def login(): user_name = input("UserName: ") password = input("Password: ") try: - session = Session.Builder().user_pass(user_name, password).create() + SESSION = Session.Builder().user_pass(user_name, password).create() return except RuntimeError: pass + def client(): - global quality, session + """ Connects to spotify to perform querys and get songs to download """ + global QUALITY, SESSION splash() - token = session.tokens().get("user-read-email") + token = SESSION.tokens().get("user-read-email") - if checkPremium(token): - print("### DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ###") - quality = AudioQuality.VERY_HIGH + if check_premium(token): + print("### DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ###") + QUALITY = AudioQuality.VERY_HIGH else: - print("### DETECTED FREE ACCOUNT - USING HIGH QUALITY ###") - quality = AudioQuality.HIGH + print("### DETECTED FREE ACCOUNT - USING HIGH QUALITY ###") + QUALITY = AudioQuality.HIGH if len(sys.argv) > 1: if sys.argv[1] == "-p" or sys.argv[1] == "--playlist": - downloadFromOurPlaylists() + download_from_user_playlist() elif sys.argv[1] == "-ls" or sys.argv[1] == "--liked-songs": - savedSongs = get_saved_tracks(token) - for song in savedSongs: - downloadTrack(song['track']['id'], "Liked Songs/") + for song in get_saved_tracks(token): + download_track(song['track']['id'], "Liked Songs/") print("\n") else: track_uri_search = re.search( @@ -121,41 +128,41 @@ def client(): if track_uri_search is not None else track_url_search).group("TrackID") - downloadTrack(track_id_str) + download_track(track_id_str) elif album_uri_search is not None or album_url_search is not None: album_id_str = (album_uri_search if album_uri_search is not None else album_url_search).group("AlbumID") - downloadAlbum(album_id_str) + download_album(album_id_str) elif playlist_uri_search is not None or playlist_url_search is not None: playlist_id_str = (playlist_uri_search - if playlist_uri_search is not None else - playlist_url_search).group("PlaylistID") + if playlist_uri_search is not None else + playlist_url_search).group("PlaylistID") - playlistSongs = get_playlist_songs(token, playlist_id_str) + playlist_songs = get_playlist_songs(token, playlist_id_str) name, creator = get_playlist_info(token, playlist_id_str) - for song in playlistSongs: - downloadTrack(song['track']['id'], sanitizeData(name) + "/") + for song in playlist_songs: + download_track(song['track']['id'], + sanitize_data(name) + "/") print("\n") else: - searchText = input("Enter search: ") - search(searchText) + search_text = input("Enter search: ") + search(search_text) wait() - -#related functions that do stuff with the spotify API -def search(searchTerm): - token = session.tokens().get("user-read-email") - +# related functions that do stuff with the spotify API +def search(search_term): + """ Searches Spotify's API for relevant data """ + token = SESSION.tokens().get("user-read-email") resp = requests.get( "https://api.spotify.com/v1/search", { "limit": "10", "offset": "0", - "q": searchTerm, + "q": search_term, "type": "track,album,playlist" }, headers={"Authorization": "Bearer %s" % token}, @@ -172,11 +179,10 @@ def search(searchTerm): ",".join([artist["name"] for artist in track["artists"]]), )) i += 1 - totalTracks = i - 1 + total_tracks = i - 1 print("\n") else: - totalTracks = 0 - + total_tracks = 0 albums = resp.json()["albums"]["items"] if len(albums) > 0: @@ -188,114 +194,120 @@ def search(searchTerm): ",".join([artist["name"] for artist in album["artists"]]), )) i += 1 - totalAlbums = i - totalTracks - 1 + total_albums = i - total_tracks - 1 print("\n") else: - totalAlbums = 0 - + total_albums = 0 playlists = resp.json()["playlists"]["items"] - if len(playlists) > 0: - print("### PLAYLISTS ###") - for playlist in playlists: - print("%d, %s | %s" % ( - i, - playlist["name"], - playlist['owner']['display_name'], - )) - i += 1 - totalPlaylists = i - totalTracks - totalAlbums - 1 - print("\n") - else: - totalPlaylists = 0 + print("### PLAYLISTS ###") + for playlist in playlists: + print("%d, %s | %s" % ( + i, + playlist["name"], + playlist['owner']['display_name'], + )) + i += 1 + print("\n") if len(tracks) + len(albums) + len(playlists) == 0: print("NO RESULTS FOUND - EXITING...") else: position = int(input("SELECT ITEM BY ID: ")) - if position <= totalTracks: - trackId = tracks[position - 1]["id"] - downloadTrack(trackId) - elif position <= totalAlbums + totalTracks: - downloadAlbum(albums[position - totalTracks - 1]["id"]) + if position <= total_tracks: + track_id = tracks[position - 1]["id"] + download_track(track_id) + elif position <= total_albums + total_tracks: + download_album(albums[position - total_tracks - 1]["id"]) else: - playlistChoice = playlists[position - totalTracks - totalAlbums - 1] - playlistSongs = get_playlist_songs(token, playlistChoice['id']) - for song in playlistSongs: - if song['track']['id'] != None: - downloadTrack(song['track']['id'], sanitizeData(playlistChoice['name'].strip()) + "/") + playlist_choice = playlists[position - + total_tracks - total_albums - 1] + playlist_songs = get_playlist_songs(token, playlist_choice['id']) + for song in playlist_songs: + if song['track']['id'] is not None: + download_track(song['track']['id'], sanitize_data( + playlist_choice['name'].strip()) + "/") print("\n") -def getSongInfo(songId): - token = session.tokens().get("user-read-email") - info = json.loads(requests.get("https://api.spotify.com/v1/tracks?ids=" + songId + '&market=from_token', headers={"Authorization": "Bearer %s" % token}).text) +def get_song_info(song_id): + """ Retrieves metadata for downloaded songs """ + token = SESSION.tokens().get("user-read-email") + + info = json.loads(requests.get("https://api.spotify.com/v1/tracks?ids=" + song_id + + '&market=from_token', headers={"Authorization": "Bearer %s" % token}).text) artists = [] - for x in info['tracks'][0]['artists']: - artists.append(sanitizeData(x['name'])) - albumName = sanitizeData(info['tracks'][0]['album']["name"]) - name = sanitizeData(info['tracks'][0]['name']) - imageUrl = info['tracks'][0]['album']['images'][0]['url'] - releaseYear = info['tracks'][0]['album']['release_date'].split("-")[0] + for data in info['tracks'][0]['artists']: + artists.append(sanitize_data(data['name'])) + album_name = sanitize_data(info['tracks'][0]['album']["name"]) + name = sanitize_data(info['tracks'][0]['name']) + image_url = info['tracks'][0]['album']['images'][0]['url'] + release_year = info['tracks'][0]['album']['release_date'].split("-")[0] disc_number = info['tracks'][0]['disc_number'] track_number = info['tracks'][0]['track_number'] - scrapedSongId = info['tracks'][0]['id'] - isPlayAble = info['tracks'][0]['is_playable'] + scraped_song_id = info['tracks'][0]['id'] + is_playable = info['tracks'][0]['is_playable'] - return artists, albumName, name, imageUrl, releaseYear, disc_number, track_number, scrapedSongId, isPlayAble + return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable -def checkPremium(access_token): + +def check_premium(access_token): + """ If user has spotify premium return true """ headers = {'Authorization': f'Bearer {access_token}'} - resp = requests.get('https://api.spotify.com/v1/me', headers=headers).json() - if "product" in resp and resp["product"] == "premium": - return True - else: - return False + resp = requests.get('https://api.spotify.com/v1/me', + headers=headers).json() + return bool("product" in resp and resp["product"] == "premium") - -#Functions directly related to modifying the downloaded audio and its metadata -def convertAudioFormat(filename): - global musicFormat - print("### CONVERTING TO " + musicFormat.upper() + " ###") +# Functions directly related to modifying the downloaded audio and its metadata +def convert_audio_format(filename): + """ Converts raw audio into playable mp3 or ogg vorbis """ + global MUSIC_FORMAT + print("### CONVERTING TO " + MUSIC_FORMAT.upper() + " ###") raw_audio = AudioSegment.from_file(filename, format="ogg", - frame_rate=44100, channels=2, sample_width=2) - if quality == AudioQuality.VERY_HIGH: + frame_rate=44100, channels=2, sample_width=2) + if QUALITY == AudioQuality.VERY_HIGH: bitrate = "320k" else: bitrate = "160k" - raw_audio.export(filename, format=musicFormat, bitrate=bitrate) + raw_audio.export(filename, format=MUSIC_FORMAT, bitrate=bitrate) -def setAudioTags(filename, artists, name, albumName, releaseYear, disc_number, track_number): + +def set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number): + """ sets music_tag metadata """ print("### SETTING MUSIC TAGS ###") - f = music_tag.load_file(filename) - f['artist'] = convArtistFormat(artists) - f['tracktitle'] = name - f['album'] = albumName - f['year'] = releaseYear - f['discnumber'] = disc_number - f['tracknumber'] = track_number - f.save() + tags = music_tag.load_file(filename) + tags['artist'] = conv_artist_format(artists) + tags['tracktitle'] = name + tags['album'] = album_name + tags['year'] = release_year + tags['discnumber'] = disc_number + tags['tracknumber'] = track_number + tags.save() -def setMusicThumbnail(filename, imageUrl): + +def set_music_thumbnail(filename, image_url): + """ Downloads cover artwork """ print("### SETTING THUMBNAIL ###") - r = requests.get(imageUrl).content - f = music_tag.load_file(filename) - f['artwork'] = r - f.save() + img = requests.get(image_url).content + tags = music_tag.load_file(filename) + tags['artwork'] = img + tags.save() -def convArtistFormat(artists): + +def conv_artist_format(artists): + """ Returns converted artist format """ formatted = "" for x in artists: formatted += x + ", " return formatted[:-2] - -#Extra functions directly related to spotify playlists +# Extra functions directly related to spotify playlists def get_all_playlists(access_token): + """ Returns list of users playlists """ playlists = [] limit = 50 offset = 0 @@ -303,7 +315,8 @@ def get_all_playlists(access_token): while True: headers = {'Authorization': f'Bearer {access_token}'} params = {'limit': limit, 'offset': offset} - resp = requests.get("https://api.spotify.com/v1/me/playlists", headers=headers, params=params).json() + resp = requests.get("https://api.spotify.com/v1/me/playlists", + headers=headers, params=params).json() offset += limit playlists.extend(resp['items']) @@ -312,7 +325,9 @@ def get_all_playlists(access_token): return playlists + def get_playlist_songs(access_token, playlist_id): + """ returns list of songs in a playlist """ songs = [] offset = 0 limit = 100 @@ -320,7 +335,8 @@ def get_playlist_songs(access_token, playlist_id): while True: headers = {'Authorization': f'Bearer {access_token}'} params = {'limit': limit, 'offset': offset} - resp = requests.get(f'https://api.spotify.com/v1/playlists/{playlist_id}/tracks', headers=headers, params=params).json() + resp = requests.get( + f'https://api.spotify.com/v1/playlists/{playlist_id}/tracks', headers=headers, params=params).json() offset += limit songs.extend(resp['items']) @@ -329,14 +345,18 @@ def get_playlist_songs(access_token, playlist_id): return songs + def get_playlist_info(access_token, playlist_id): - headers = {'Authorization': f'Bearer {access_token}'} - resp = requests.get(f'https://api.spotify.com/v1/playlists/{playlist_id}?fields=name,owner(display_name)&market=from_token', headers=headers).json() - return resp['name'].strip(), resp['owner']['display_name'].strip() + """ Returns information scraped from playlist """ + headers = {'Authorization': f'Bearer {access_token}'} + resp = requests.get( + f'https://api.spotify.com/v1/playlists/{playlist_id}?fields=name,owner(display_name)&market=from_token', headers=headers).json() + return resp['name'].strip(), resp['owner']['display_name'].strip() -#Extra functions directly related to spotify albums +# Extra functions directly related to spotify albums def get_album_tracks(access_token, album_id): + """ Returns album tracklist """ songs = [] offset = 0 limit = 50 @@ -344,7 +364,8 @@ def get_album_tracks(access_token, album_id): while True: headers = {'Authorization': f'Bearer {access_token}'} params = {'limit': limit, 'offset': offset} - resp = requests.get(f'https://api.spotify.com/v1/albums/{album_id}/tracks', headers=headers, params=params).json() + resp = requests.get( + f'https://api.spotify.com/v1/albums/{album_id}/tracks', headers=headers, params=params).json() offset += limit songs.extend(resp['items']) @@ -353,14 +374,18 @@ def get_album_tracks(access_token, album_id): return songs + def get_album_name(access_token, album_id): + """ Returns album name """ headers = {'Authorization': f'Bearer {access_token}'} - resp = requests.get(f'https://api.spotify.com/v1/albums/{album_id}', headers=headers).json() - return resp['artists'][0]['name'], sanitizeData(resp['name']) + resp = requests.get( + f'https://api.spotify.com/v1/albums/{album_id}', headers=headers).json() + return resp['artists'][0]['name'], sanitize_data(resp['name']) -#Extra functions directly related to our saved tracks +# Extra functions directly related to our saved tracks def get_saved_tracks(access_token): + """ Returns user's saved tracks """ songs = [] offset = 0 limit = 50 @@ -368,7 +393,8 @@ def get_saved_tracks(access_token): while True: headers = {'Authorization': f'Bearer {access_token}'} params = {'limit': limit, 'offset': offset} - resp = requests.get(f'https://api.spotify.com/v1/me/tracks', headers=headers, params=params).json() + resp = requests.get('https://api.spotify.com/v1/me/tracks', + headers=headers, params=params).json() offset += limit songs.extend(resp['items']) @@ -378,64 +404,65 @@ def get_saved_tracks(access_token): return songs - -#Functions directly related to downloading stuff -def downloadTrack(track_id_str: str, extra_paths = ""): - global rootPath, skipExistingFiles, musicFormat +# Functions directly related to downloading stuff +def download_track(track_id_str: str, extra_paths=""): + """ Downloads raw song audio from Spotify """ + global ROOT_PATH, SKIP_EXISTING_FILES, MUSIC_FORMAT track_id = TrackId.from_base62(track_id_str) - artists, albumName, name, imageUrl, releaseYear, disc_number, track_number, scrapedSongId, isPlayAble = getSongInfo(track_id_str) + artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable = get_song_info( + track_id_str) - songName = artists[0] + " - " + name - filename = rootPath + extra_paths + songName + '.' + musicFormat + song_name = artists[0] + " - " + name + filename = ROOT_PATH + extra_paths + song_name + '.' + MUSIC_FORMAT - - if not isPlayAble: - print("### SKIPPING:", songName, "(SONG IS UNAVAILABLE) ###") + if not is_playable: + print("### SKIPPING:", song_name, "(SONG IS UNAVAILABLE) ###") else: - if os.path.isfile(filename) and skipExistingFiles: - print("### SKIPPING:", songName, "(SONG ALREADY EXISTS) ###") + if os.path.isfile(filename) and SKIP_EXISTING_FILES: + print("### SKIPPING:", song_name, "(SONG ALREADY EXISTS) ###") else: - if track_id_str != scrapedSongId: + if track_id_str != scraped_song_id: print("### APPLYING PATCH TO LET SONG DOWNLOAD ###") - track_id_str = scrapedSongId + track_id_str = scraped_song_id track_id = TrackId.from_base62(track_id_str) - print("### FOUND SONG:", songName, " ###") + print("### FOUND SONG:", song_name, " ###") try: - stream = session.content_feeder().load( - track_id, VorbisOnlyAudioQuality(quality), False, None) + stream = SESSION.content_feeder().load( + track_id, VorbisOnlyAudioQuality(QUALITY), False, None) except: - print("### SKIPPING:", songName, "(GENERAL DOWNLOAD ERROR) ###") + print("### SKIPPING:", song_name, + "(GENERAL DOWNLOAD ERROR) ###") else: print("### DOWNLOADING RAW AUDIO ###") - if not os.path.isdir(rootPath + extra_paths): - os.makedirs(rootPath + extra_paths) + if not os.path.isdir(ROOT_PATH + extra_paths): + os.makedirs(ROOT_PATH + extra_paths) - with open(filename,'wb') as f: - ''' + with open(filename, 'wb') as file: + """ chunk_size = 1024 * 16 buffer = bytearray(chunk_size) bpos = 0 - ''' + """ - #With the updated version of librespot my faster download method broke so we are using the old fallback method + # With the updated version of librespot my faster download method broke so we are using the old fallback method while True: byte = stream.input_stream.stream().read() if byte == b'': break - f.write(byte) + file.write(byte) - ''' + """ while True: byte = stream.input_stream.stream().read() if byte == -1: # flush buffer before breaking if bpos > 0: - f.write(buffer[0:bpos]) + file.write(buffer[0:bpos]) break print(bpos) @@ -443,28 +470,34 @@ def downloadTrack(track_id_str: str, extra_paths = ""): bpos += 1 if bpos == (chunk_size): - f.write(buffer) + file.write(buffer) bpos = 0 - ''' + """ try: - convertAudioFormat(filename) + convert_audio_format(filename) except: os.remove(filename) - print("### SKIPPING:", songName, "(GENERAL CONVERSION ERROR) ###") + print("### SKIPPING:", song_name, + "(GENERAL CONVERSION ERROR) ###") else: - setAudioTags(filename, artists, name, albumName, releaseYear, disc_number, track_number) - setMusicThumbnail(filename, imageUrl) + set_audio_tags(filename, artists, name, album_name, + release_year, disc_number, track_number) + set_music_thumbnail(filename, image_url) -def downloadAlbum(album): - token = session.tokens().get("user-read-email") + +def download_album(album): + """ Downloads songs from an album """ + token = SESSION.tokens().get("user-read-email") artist, album_name = get_album_name(token, album) tracks = get_album_tracks(token, album) for track in tracks: - downloadTrack(track['id'], artist + " - " + album_name + "/") + download_track(track['id'], artist + " - " + album_name + "/") print("\n") -def downloadFromOurPlaylists(): - token = session.tokens().get("user-read-email") + +def download_from_user_playlist(): + """ Downloads songs from users playlist """ + token = SESSION.tokens().get("user-read-email") playlists = get_all_playlists(token) count = 1 @@ -472,18 +505,23 @@ def downloadFromOurPlaylists(): print(str(count) + ": " + playlist['name'].strip()) count += 1 - playlistChoice = input("SELECT A PLAYLIST BY ID: ") - playlistSongs = get_playlist_songs(token, playlists[int(playlistChoice) - 1]['id']) - for song in playlistSongs: - if song['track']['id'] != None: - downloadTrack(song['track']['id'], sanitizeData(playlists[int(playlistChoice) - 1]['name'].strip()) + "/") + playlist_choice = input("SELECT A PLAYLIST BY ID: ") + playlist_songs = get_playlist_songs( + token, playlists[int(playlist_choice) - 1]['id']) + for song in playlist_songs: + if song['track']['id'] is not None: + download_track(song['track']['id'], sanitize_data( + playlists[int(playlist_choice) - 1]['name'].strip()) + "/") print("\n") +# Core functions here + -#Core functions here def main(): + """ Main function """ login() client() + if __name__ == "__main__": main()