mirror of
https://github.com/THIS-IS-NOT-A-BACKUP/zspotify.git
synced 2024-11-26 09:53:17 +01:00
2d501ff501
Seems as though the product key can be left out in some cases, so check if it exists beforehand.
489 lines
16 KiB
Python
489 lines
16 KiB
Python
import os
|
|
import os.path
|
|
import platform
|
|
|
|
import unicodedata
|
|
import re
|
|
|
|
import subprocess
|
|
import time
|
|
import sys
|
|
|
|
import requests
|
|
|
|
import json
|
|
|
|
import music_tag
|
|
|
|
from librespot.audio.decoders import AudioQuality, VorbisOnlyAudioQuality
|
|
from librespot.core import Session
|
|
from librespot.metadata import TrackId
|
|
|
|
from pydub import AudioSegment
|
|
|
|
session: Session = None
|
|
|
|
import hashlib
|
|
|
|
|
|
rootPath = "ZSpotify Music/"
|
|
skipExistingFiles = True
|
|
|
|
|
|
#miscellaneous functions for general use
|
|
def clear():
|
|
if platform.system() == "Windows":
|
|
os.system("cls")
|
|
else:
|
|
os.system("clear")
|
|
|
|
def wait(seconds: int = 3):
|
|
for i in range(seconds)[::-1]:
|
|
print("\rWait for %d second(s)..." % (i + 1), end="")
|
|
time.sleep(1)
|
|
|
|
def sanitizeData(value):
|
|
return value.replace("\\", "").replace("/", "").replace(":", "").replace("*", "").replace("?", "").replace("'", "").replace("<", "").replace(">", "").replace('"',"")
|
|
|
|
def splash():
|
|
print("=================================\n"
|
|
"| Spotify Downloader |\n"
|
|
"| |\n"
|
|
"| by Footsiefat/Deathmonger |\n"
|
|
"=================================\n\n\n")
|
|
|
|
|
|
|
|
#two mains functions for logging in and doing client stuff
|
|
def login():
|
|
global session
|
|
|
|
if os.path.isfile("credentials.json"):
|
|
try:
|
|
session = Session.Builder().stored_file().create()
|
|
return
|
|
except RuntimeError:
|
|
pass
|
|
while True:
|
|
user_name = input("UserName: ")
|
|
password = input("Password: ")
|
|
try:
|
|
session = Session.Builder().user_pass(user_name, password).create()
|
|
return
|
|
except RuntimeError:
|
|
pass
|
|
|
|
def client():
|
|
global quality, session
|
|
splash()
|
|
|
|
token = session.tokens().get("user-read-email")
|
|
|
|
if checkPremium(token):
|
|
print("### DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ###")
|
|
quality = AudioQuality.VERY_HIGH
|
|
else:
|
|
print("### DETECTED FREE ACCOUNT - USING HIGH QUALITY ###")
|
|
quality = AudioQuality.HIGH
|
|
|
|
if len(sys.argv) > 1:
|
|
if sys.argv[1] == "-p" or sys.argv[1] == "--playlist":
|
|
downloadFromOurPlaylists()
|
|
elif sys.argv[1] == "-ls" or sys.argv[1] == "--liked-songs":
|
|
savedSongs = get_saved_tracks(token)
|
|
for song in savedSongs:
|
|
downloadTrack(song['track']['id'], "Liked Songs/")
|
|
print("\n")
|
|
else:
|
|
track_uri_search = re.search(
|
|
r"^spotify:track:(?P<TrackID>[0-9a-zA-Z]{22})$", sys.argv[1])
|
|
track_url_search = re.search(
|
|
r"^(https?://)?open\.spotify\.com/track/(?P<TrackID>[0-9a-zA-Z]{22})(\?si=.+?)?$",
|
|
sys.argv[1],
|
|
)
|
|
|
|
album_uri_search = re.search(
|
|
r"^spotify:album:(?P<AlbumID>[0-9a-zA-Z]{22})$", sys.argv[1])
|
|
album_url_search = re.search(
|
|
r"^(https?://)?open\.spotify\.com/album/(?P<AlbumID>[0-9a-zA-Z]{22})(\?si=.+?)?$",
|
|
sys.argv[1],
|
|
)
|
|
|
|
playlist_uri_search = re.search(
|
|
r"^spotify:playlist:(?P<PlaylistID>[0-9a-zA-Z]{22})$", sys.argv[1])
|
|
playlist_url_search = re.search(
|
|
r"^(https?://)?open\.spotify\.com/playlist/(?P<PlaylistID>[0-9a-zA-Z]{22})(\?si=.+?)?$",
|
|
sys.argv[1],
|
|
)
|
|
|
|
if track_uri_search is not None or track_url_search is not None:
|
|
track_id_str = (track_uri_search
|
|
if track_uri_search is not None else
|
|
track_url_search).group("TrackID")
|
|
|
|
downloadTrack(track_id_str)
|
|
elif album_uri_search is not None or album_url_search is not None:
|
|
album_id_str = (album_uri_search
|
|
if album_uri_search is not None else
|
|
album_url_search).group("AlbumID")
|
|
|
|
downloadAlbum(album_id_str)
|
|
elif playlist_uri_search is not None or playlist_url_search is not None:
|
|
playlist_id_str = (playlist_uri_search
|
|
if playlist_uri_search is not None else
|
|
playlist_url_search).group("PlaylistID")
|
|
|
|
playlistSongs = get_playlist_songs(token, playlist_id_str)
|
|
name, creator = get_playlist_info(token, playlist_id_str)
|
|
for song in playlistSongs:
|
|
downloadTrack(song['track']['id'], sanitizeData(name) + "/")
|
|
print("\n")
|
|
else:
|
|
searchText = input("Enter search: ")
|
|
search(searchText)
|
|
wait()
|
|
|
|
|
|
|
|
#related functions that do stuff with the spotify API
|
|
def search(searchTerm):
|
|
token = session.tokens().get("user-read-email")
|
|
|
|
|
|
resp = requests.get(
|
|
"https://api.spotify.com/v1/search",
|
|
{
|
|
"limit": "10",
|
|
"offset": "0",
|
|
"q": searchTerm,
|
|
"type": "track,album,playlist"
|
|
},
|
|
headers={"Authorization": "Bearer %s" % token},
|
|
)
|
|
|
|
i = 1
|
|
tracks = resp.json()["tracks"]["items"]
|
|
if len(tracks) > 0:
|
|
print("### TRACKS ###")
|
|
for track in tracks:
|
|
print("%d, %s | %s" % (
|
|
i,
|
|
track["name"],
|
|
",".join([artist["name"] for artist in track["artists"]]),
|
|
))
|
|
i += 1
|
|
totalTracks = i - 1
|
|
print("\n")
|
|
else:
|
|
totalTracks = 0
|
|
|
|
|
|
albums = resp.json()["albums"]["items"]
|
|
if len(albums) > 0:
|
|
print("### ALBUMS ###")
|
|
for album in albums:
|
|
print("%d, %s | %s" % (
|
|
i,
|
|
album["name"],
|
|
",".join([artist["name"] for artist in album["artists"]]),
|
|
))
|
|
i += 1
|
|
totalAlbums = i - totalTracks - 1
|
|
print("\n")
|
|
else:
|
|
totalAlbums = 0
|
|
|
|
|
|
playlists = resp.json()["playlists"]["items"]
|
|
if len(playlists) > 0:
|
|
print("### PLAYLISTS ###")
|
|
for playlist in playlists:
|
|
print("%d, %s | %s" % (
|
|
i,
|
|
playlist["name"],
|
|
playlist['owner']['display_name'],
|
|
))
|
|
i += 1
|
|
totalPlaylists = i - totalTracks - totalAlbums - 1
|
|
print("\n")
|
|
else:
|
|
totalPlaylists = 0
|
|
|
|
if len(tracks) + len(albums) + len(playlists) == 0:
|
|
print("NO RESULTS FOUND - EXITING...")
|
|
else:
|
|
position = int(input("SELECT ITEM BY ID: "))
|
|
|
|
if position <= totalTracks:
|
|
trackId = tracks[position - 1]["id"]
|
|
downloadTrack(trackId)
|
|
elif position <= totalAlbums + totalTracks:
|
|
downloadAlbum(albums[position - totalTracks - 1]["id"])
|
|
else:
|
|
playlistChoice = playlists[position - totalTracks - totalAlbums - 1]
|
|
playlistSongs = get_playlist_songs(token, playlistChoice['id'])
|
|
for song in playlistSongs:
|
|
if song['track']['id'] != None:
|
|
downloadTrack(song['track']['id'], sanitizeData(playlistChoice['name'].strip()) + "/")
|
|
print("\n")
|
|
|
|
def getSongInfo(songId):
|
|
token = session.tokens().get("user-read-email")
|
|
|
|
info = json.loads(requests.get("https://api.spotify.com/v1/tracks?ids=" + songId + '&market=from_token', headers={"Authorization": "Bearer %s" % token}).text)
|
|
|
|
artists = []
|
|
for x in info['tracks'][0]['artists']:
|
|
artists.append(sanitizeData(x['name']))
|
|
albumName = sanitizeData(info['tracks'][0]['album']["name"])
|
|
name = sanitizeData(info['tracks'][0]['name'])
|
|
imageUrl = info['tracks'][0]['album']['images'][0]['url']
|
|
releaseYear = info['tracks'][0]['album']['release_date'].split("-")[0]
|
|
disc_number = info['tracks'][0]['disc_number']
|
|
track_number = info['tracks'][0]['track_number']
|
|
scrapedSongId = info['tracks'][0]['id']
|
|
isPlayAble = info['tracks'][0]['is_playable']
|
|
|
|
return artists, albumName, name, imageUrl, releaseYear, disc_number, track_number, scrapedSongId, isPlayAble
|
|
|
|
def checkPremium(access_token):
|
|
headers = {'Authorization': f'Bearer {access_token}'}
|
|
resp = requests.get('https://api.spotify.com/v1/me', headers=headers).json()
|
|
if "product" in resp and resp["product"] == "premium":
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
|
|
#Functions directly related to modifying the downloaded audio and its metadata
|
|
def convertToMp3(filename):
|
|
print("### CONVERTING TO MP3 ###")
|
|
raw_audio = AudioSegment.from_file(filename, format="ogg",
|
|
frame_rate=44100, channels=2, sample_width=2)
|
|
if quality == AudioQuality.VERY_HIGH:
|
|
bitrate = "320k"
|
|
else:
|
|
bitrate = "160k"
|
|
raw_audio.export(filename, format="mp3", bitrate=bitrate)
|
|
|
|
def setAudioTags(filename, artists, name, albumName, releaseYear, disc_number, track_number):
|
|
print("### SETTING MUSIC TAGS ###")
|
|
f = music_tag.load_file(filename)
|
|
f['artist'] = convArtistFormat(artists)
|
|
f['tracktitle'] = name
|
|
f['album'] = albumName
|
|
f['year'] = releaseYear
|
|
f['discnumber'] = disc_number
|
|
f['tracknumber'] = track_number
|
|
f.save()
|
|
|
|
def setMusicThumbnail(filename, imageUrl):
|
|
print("### SETTING THUMBNAIL ###")
|
|
r = requests.get(imageUrl).content
|
|
f = music_tag.load_file(filename)
|
|
f['artwork'] = r
|
|
f.save()
|
|
|
|
def convArtistFormat(artists):
|
|
formatted = ""
|
|
for x in artists:
|
|
formatted += x + ", "
|
|
return formatted[:-2]
|
|
|
|
|
|
|
|
#Extra functions directly related to spotify playlists
|
|
def get_all_playlists(access_token):
|
|
playlists = []
|
|
limit = 50
|
|
offset = 0
|
|
|
|
while True:
|
|
headers = {'Authorization': f'Bearer {access_token}'}
|
|
params = {'limit': limit, 'offset': offset}
|
|
resp = requests.get("https://api.spotify.com/v1/me/playlists", headers=headers, params=params).json()
|
|
offset += limit
|
|
playlists.extend(resp['items'])
|
|
|
|
if len(resp['items']) < limit:
|
|
break
|
|
|
|
return playlists
|
|
|
|
def get_playlist_songs(access_token, playlist_id):
|
|
songs = []
|
|
offset = 0
|
|
limit = 100
|
|
|
|
while True:
|
|
headers = {'Authorization': f'Bearer {access_token}'}
|
|
params = {'limit': limit, 'offset': offset}
|
|
resp = requests.get(f'https://api.spotify.com/v1/playlists/{playlist_id}/tracks', headers=headers, params=params).json()
|
|
offset += limit
|
|
songs.extend(resp['items'])
|
|
|
|
if len(resp['items']) < limit:
|
|
break
|
|
|
|
return songs
|
|
|
|
def get_playlist_info(access_token, playlist_id):
|
|
headers = {'Authorization': f'Bearer {access_token}'}
|
|
resp = requests.get(f'https://api.spotify.com/v1/playlists/{playlist_id}?fields=name,owner(display_name)&market=from_token', headers=headers).json()
|
|
return resp['name'].strip(), resp['owner']['display_name'].strip()
|
|
|
|
|
|
#Extra functions directly related to spotify albums
|
|
def get_album_tracks(access_token, album_id):
|
|
songs = []
|
|
offset = 0
|
|
limit = 50
|
|
|
|
while True:
|
|
headers = {'Authorization': f'Bearer {access_token}'}
|
|
params = {'limit': limit, 'offset': offset}
|
|
resp = requests.get(f'https://api.spotify.com/v1/albums/{album_id}/tracks', headers=headers, params=params).json()
|
|
offset += limit
|
|
songs.extend(resp['items'])
|
|
|
|
if len(resp['items']) < limit:
|
|
break
|
|
|
|
return songs
|
|
|
|
def get_album_name(access_token, album_id):
|
|
headers = {'Authorization': f'Bearer {access_token}'}
|
|
resp = requests.get(f'https://api.spotify.com/v1/albums/{album_id}', headers=headers).json()
|
|
return resp['artists'][0]['name'], sanitizeData(resp['name'])
|
|
|
|
|
|
#Extra functions directly related to our saved tracks
|
|
def get_saved_tracks(access_token):
|
|
songs = []
|
|
offset = 0
|
|
limit = 50
|
|
|
|
while True:
|
|
headers = {'Authorization': f'Bearer {access_token}'}
|
|
params = {'limit': limit, 'offset': offset}
|
|
resp = requests.get(f'https://api.spotify.com/v1/me/tracks', headers=headers, params=params).json()
|
|
offset += limit
|
|
songs.extend(resp['items'])
|
|
|
|
if len(resp['items']) < limit:
|
|
break
|
|
|
|
return songs
|
|
|
|
|
|
|
|
#Functions directly related to downloading stuff
|
|
def downloadTrack(track_id_str: str, extra_paths = ""):
|
|
global rootPath, skipExistingFiles
|
|
|
|
track_id = TrackId.from_base62(track_id_str)
|
|
artists, albumName, name, imageUrl, releaseYear, disc_number, track_number, scrapedSongId, isPlayAble = getSongInfo(track_id_str)
|
|
|
|
songName = artists[0] + " - " + name
|
|
filename = rootPath + extra_paths + songName + '.mp3'
|
|
|
|
|
|
if not isPlayAble:
|
|
print("### SKIPPING:", songName, "(SONG IS UNAVAILABLE) ###")
|
|
else:
|
|
if os.path.isfile(filename) and skipExistingFiles:
|
|
print("### SKIPPING:", songName, "(SONG ALREADY EXISTS) ###")
|
|
else:
|
|
if track_id_str != scrapedSongId:
|
|
print("### APPLYING PATCH TO LET SONG DOWNLOAD ###")
|
|
track_id_str = scrapedSongId
|
|
track_id = TrackId.from_base62(track_id_str)
|
|
|
|
print("### FOUND SONG:", songName, " ###")
|
|
|
|
try:
|
|
stream = session.content_feeder().load(
|
|
track_id, VorbisOnlyAudioQuality(quality), False, None)
|
|
except:
|
|
print("### SKIPPING:", songName, "(GENERAL DOWNLOAD ERROR) ###")
|
|
else:
|
|
print("### DOWNLOADING RAW AUDIO ###")
|
|
|
|
if not os.path.isdir(rootPath + extra_paths):
|
|
os.makedirs(rootPath + extra_paths)
|
|
|
|
with open(filename,'wb') as f:
|
|
'''
|
|
chunk_size = 1024 * 16
|
|
buffer = bytearray(chunk_size)
|
|
bpos = 0
|
|
'''
|
|
|
|
#With the updated version of librespot my faster download method broke so we are using the old fallback method
|
|
while True:
|
|
byte = stream.input_stream.stream().read()
|
|
if byte == b'':
|
|
break
|
|
f.write(byte)
|
|
|
|
'''
|
|
while True:
|
|
byte = stream.input_stream.stream().read()
|
|
|
|
if byte == -1:
|
|
# flush buffer before breaking
|
|
if bpos > 0:
|
|
f.write(buffer[0:bpos])
|
|
break
|
|
|
|
print(bpos)
|
|
buffer[bpos] = byte
|
|
bpos += 1
|
|
|
|
if bpos == (chunk_size):
|
|
f.write(buffer)
|
|
bpos = 0
|
|
'''
|
|
try:
|
|
convertToMp3(filename)
|
|
except:
|
|
os.remove(filename)
|
|
print("### SKIPPING:", songName, "(GENERAL CONVERSION ERROR) ###")
|
|
else:
|
|
setAudioTags(filename, artists, name, albumName, releaseYear, disc_number, track_number)
|
|
setMusicThumbnail(filename, imageUrl)
|
|
|
|
def downloadAlbum(album):
|
|
token = session.tokens().get("user-read-email")
|
|
artist, album_name = get_album_name(token, album)
|
|
tracks = get_album_tracks(token, album)
|
|
for track in tracks:
|
|
downloadTrack(track['id'], artist + " - " + album_name + "/")
|
|
print("\n")
|
|
|
|
def downloadFromOurPlaylists():
|
|
token = session.tokens().get("user-read-email")
|
|
playlists = get_all_playlists(token)
|
|
|
|
count = 1
|
|
for playlist in playlists:
|
|
print(str(count) + ": " + playlist['name'].strip())
|
|
count += 1
|
|
|
|
playlistChoice = input("SELECT A PLAYLIST BY ID: ")
|
|
playlistSongs = get_playlist_songs(token, playlists[int(playlistChoice) - 1]['id'])
|
|
for song in playlistSongs:
|
|
if song['track']['id'] != None:
|
|
downloadTrack(song['track']['id'], sanitizeData(playlists[int(playlistChoice) - 1]['name'].strip()) + "/")
|
|
print("\n")
|
|
|
|
|
|
#Core functions here
|
|
def main():
|
|
login()
|
|
client()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|