Merge branch 'main' of https://github.com/Footsiefat/zspotify into Footsiefat-main

This commit is contained in:
yiannisha 2021-10-23 18:59:25 +03:00
commit a02b2f03d1
4 changed files with 254 additions and 196 deletions

3
.gitignore vendored
View File

@ -146,3 +146,6 @@ credentials.json
#Download Folder #Download Folder
ZSpotify\ Music/ ZSpotify\ Music/
ZSpotify\ Podcasts/ ZSpotify\ Podcasts/
# Intellij
.idea

View File

@ -16,7 +16,7 @@ Binaries
- Python 3.8 or greater - Python 3.8 or greater
- ffmpeg* - ffmpeg*
- Git - Git**
Python packages: Python packages:
@ -26,37 +26,52 @@ Python packages:
\*ffmpeg can be installed via apt for Debian-based distros or by downloading the binaries from [ffmpeg.org](https://ffmpeg.org) and placing them in your %PATH% in Windows. \*ffmpeg can be installed via apt for Debian-based distros or by downloading the binaries from [ffmpeg.org](https://ffmpeg.org) and placing them in your %PATH% in Windows.
\*\*Git can be installed via apt for Debian-based distros or by downloading the binaries from [git-scm.com](https://git-scm.com/download/win) for Windows.
``` ```
Command line usage: Command line usage:
python zspotify.py Loads search prompt to find then download a specific track, album or playlist python zspotify.py Loads search prompt to find then download a specific track, album or playlist
python zspotify.py <track/album/playlist/episode url> Downloads the track, album, playlist or podcast episode specified as a command line argument python zspotify.py <track/album/playlist/episode url> Downloads the track, album, playlist or podcast episode specified as a command line argument
python zspotify.py <artist url> Downloads all albums by specified artist
Extra command line options: Extra command line options:
-p, --playlist Downloads a saved playlist from your account -p, --playlist Downloads a saved playlist from your account
-ls, --liked-songs Downloads all the liked songs from your account -ls, --liked-songs Downloads all the liked songs from your account
Special hardcoded options: Options that can be configured in zs_config.json:
ROOT_PATH Change this path if you don't like the default directory where ZSpotify saves the music ROOT_PATH Change this path if you don't like the default directory where ZSpotify saves the music
ROOT_PODCAST_PATH Change this path if you don't like the default directory where ZSpotify saves the podcasts ROOT_PODCAST_PATH Change this path if you don't like the default directory where ZSpotify saves the podcasts
SKIP_EXISTING_FILES Set this to False if you want ZSpotify to overwrite files with the same name rather than skipping the song SKIP_EXISTING_FILES Set this to false if you want ZSpotify to overwrite files with the same name rather than skipping the song
MUSIC_FORMAT Set this to "ogg" if you would rather that format audio over "mp3" MUSIC_FORMAT Set this to "ogg" if you would rather that format audio over "mp3"
RAW_AUDIO_AS_IS Set this to True to only stream the audio to a file and do no re-encoding or post processing RAW_AUDIO_AS_IS Set this to true to only stream the audio to a file and do no re-encoding or post processing
FORCE_PREMIUM Set this to True if ZSpotify isn't automatically detecting that you are using a premium account FORCE_PREMIUM Set this to true if ZSpotify isn't automatically detecting that you are using a premium account
ANTI_BAN_WAIT_TIME Change this setting if the time waited between bulk downloads is too high or low ANTI_BAN_WAIT_TIME Change this setting if the time waited between bulk downloads is too high or low
OVERRIDE_AUTO_WAIT Change this to True if you want to completely disable the wait between songs for faster downloads with the risk of instability OVERRIDE_AUTO_WAIT Change this to true if you want to completely disable the wait between songs for faster downloads with the risk of instability
``` ```
### Will my account get banned if I use this tool?
Currently no user has reported their account getting banned after using ZSpotify.
This isn't to say _you_ won't get banned as it is technically against Spotify's TOS.
**Use ZSpotify at your own risk**, the developers of ZSpotify are not responsible if your account gets banned.
### Contributing
Please be sure to lint your code with pylint before issuing a pull-request, thanks!
## **Changelog:** ## **Changelog:**
**v2.1 (23 Oct 2021):**
- Moved configuration from hard-coded values to separate zs_config.json file.
- Add subfolders for each disc.
- Can now search and download all songs by artist.
- Show single progress bar for entire album.
- Added song number at start of track name in albums.
**v2.0 (22 Oct 2021):** **v2.0 (22 Oct 2021):**
- Added progress bar for downloads. - Added progress bar for downloads.
- Added multi-select support for all results when searching. - Added multi-select support for all results when searching.
- Added GPLv3 Licence. - Added GPLv3 Licence.
- Changed welcome banner and removed unnecessary debug print statments. - Changed welcome banner and removed unnecessary debug print statements.
**v1.9 (22 Oct 2021):** **v1.9 (22 Oct 2021):**
- Added Gitea mirror for when the Spotify Glowies come to DMCA the shit out of this. - Added Gitea mirror for when the Spotify Glowies come to DMCA the shit out of this.

12
zs_config.json Normal file
View File

@ -0,0 +1,12 @@
{
"ROOT_PATH": "ZSpotify Music/",
"ROOT_PODCAST_PATH": "ZSpotify Podcasts/",
"SKIP_EXISTING_FILES": true,
"MUSIC_FORMAT": "mp3",
"RAW_AUDIO_AS_IS": false,
"FORCE_PREMIUM": false,
"ANTI_BAN_WAIT_TIME": 1,
"OVERRIDE_AUTO_WAIT": false,
"CHUNK_SIZE": 50000,
"SPLIT_ALBUM_DISCS": false
}

View File

@ -7,6 +7,7 @@ It's like youtube-dl, but for Spotify.
(Made by Deathmonger/Footsiefat - @doomslayer117:matrix.org) (Made by Deathmonger/Footsiefat - @doomslayer117:matrix.org)
""" """
from getpass import getpass
import json import json
import os import os
import os.path import os.path
@ -14,36 +15,23 @@ import platform
import re import re
import sys import sys
import time import time
from getpass import getpass
import music_tag
import requests
from librespot.audio.decoders import AudioQuality, VorbisOnlyAudioQuality from librespot.audio.decoders import AudioQuality, VorbisOnlyAudioQuality
from librespot.core import Session from librespot.core import Session
from librespot.metadata import TrackId, EpisodeId from librespot.metadata import TrackId, EpisodeId
import music_tag
from pydub import AudioSegment from pydub import AudioSegment
import requests
from tqdm import tqdm from tqdm import tqdm
QUALITY = None
SESSION: Session = None SESSION: Session = None
sanitize = ["\\", "/", ":", "*", "?", "'", "<", ">", '"'] SANITIZE = ["\\", "/", ":", "*", "?", "'", "<", ">", "\""]
# user-customizable variables that adjust the core functionality of ZSpotify
with open("zs_config.json", encoding="utf-8") as config_file:
ZS_CONFIG = json.load(config_file)
# Hardcoded variables that adjust the core functionality of ZSpotify
ROOT_PATH = "ZSpotify Music/"
ROOT_PODCAST_PATH = "ZSpotify Podcasts/"
SKIP_EXISTING_FILES = True
MUSIC_FORMAT = "mp3" # or "ogg"
RAW_AUDIO_AS_IS = False # set to True if you wish to just save the raw audio
FORCE_PREMIUM = False # set to True if not detecting your premium account automatically
# This is how many seconds ZSpotify waits between downloading tracks so spotify doesn't get out the ban hammer
ANTI_BAN_WAIT_TIME = 1
# Set this to True to not wait at all between tracks and just go balls to the wall
OVERRIDE_AUTO_WAIT = False
CHUNK_SIZE = 50000
# miscellaneous functions for general use # miscellaneous functions for general use
@ -59,14 +47,13 @@ def clear():
def wait(seconds: int = 3): def wait(seconds: int = 3):
""" Pause for a set number of seconds """ """ Pause for a set number of seconds """
for i in range(seconds)[::-1]: for i in range(seconds)[::-1]:
print("\rWait for %d second(s)..." % (i + 1), end="") print(f"\rWait for {i + 1} second(s)...", end="")
time.sleep(1) time.sleep(1)
def sanitize_data(value): def sanitize_data(value):
""" Returns given string with problematic removed """ """ Returns given string with problematic removed """
global sanitize for i in SANITIZE:
for i in sanitize:
value = value.replace(i, "") value = value.replace(i, "")
return value.replace("|", "-") return value.replace("|", "-")
@ -77,12 +64,11 @@ def split_input(selection):
if "-" in selection: if "-" in selection:
for number in range(int(selection.split("-")[0]), int(selection.split("-")[1]) + 1): for number in range(int(selection.split("-")[0]), int(selection.split("-")[1]) + 1):
inputs.append(number) inputs.append(number)
return inputs
else: else:
selections = selection.split(",") selections = selection.split(",")
for i in selections: for i in selections:
inputs.append(i.strip()) inputs.append(i.strip())
return inputs return inputs
def splash(): def splash():
@ -99,7 +85,7 @@ def splash():
# two mains functions for logging in and doing client stuff # two mains functions for logging in and doing client stuff
def login(): def login():
""" Authenticates with Spotify and saves credentials to a file """ """ Authenticates with Spotify and saves credentials to a file """
global SESSION global SESSION # pylint: disable=global-statement
if os.path.isfile("credentials.json"): if os.path.isfile("credentials.json"):
try: try:
@ -117,9 +103,9 @@ def login():
pass pass
def client(): def client(): # pylint: disable=too-many-branches,too-many-statements
""" Connects to spotify to perform query's and get songs to download """ """ Connects to spotify to perform query's and get songs to download """
global QUALITY, SESSION global QUALITY # pylint: disable=global-statement
splash() splash()
token = SESSION.tokens().get("user-read-email") token = SESSION.tokens().get("user-read-email")
@ -131,20 +117,45 @@ def client():
print("[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n") print("[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n")
QUALITY = AudioQuality.HIGH QUALITY = AudioQuality.HIGH
if len(sys.argv) > 1: while True:
if sys.argv[1] == "-p" or sys.argv[1] == "--playlist": if len(sys.argv) > 1:
download_from_user_playlist() if sys.argv[1] == "-p" or sys.argv[1] == "--playlist":
elif sys.argv[1] == "-ls" or sys.argv[1] == "--liked-songs": download_from_user_playlist()
for song in get_saved_tracks(token): elif sys.argv[1] == "-ls" or sys.argv[1] == "--liked-songs":
if not song['track']['name']: for song in get_saved_tracks(token):
print( if not song["track"]["name"]:
"### SKIPPING: SONG DOES NOT EXISTS ON SPOTIFY ANYMORE ###") print(
else: "### SKIPPING: SONG DOES NOT EXISTS ON SPOTIFY ANYMORE ###")
download_track(song['track']['id'], "Liked Songs/") else:
print("\n") download_track(song["track"]["id"], "Liked Songs/")
print("\n")
else:
track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str = regex_input_for_urls(
sys.argv[1])
if track_id_str is not None:
download_track(track_id_str)
elif artist_id_str is not None:
download_artist_albums(artist_id_str)
elif album_id_str is not None:
download_album(album_id_str)
elif playlist_id_str is not None:
playlist_songs = get_playlist_songs(token, playlist_id_str)
name, _ = get_playlist_info(token, playlist_id_str)
for song in playlist_songs:
download_track(song["track"]["id"],
sanitize_data(name) + "/")
print("\n")
elif episode_id_str is not None:
download_episode(episode_id_str)
elif show_id_str is not None:
for episode in get_show_episodes(token, show_id_str):
download_episode(episode)
else: else:
track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str = regex_input_for_urls( search_text = input("Enter search or URL: ")
sys.argv[1])
track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str = regex_input_for_urls(
search_text)
if track_id_str is not None: if track_id_str is not None:
download_track(track_id_str) download_track(track_id_str)
@ -154,9 +165,9 @@ def client():
download_album(album_id_str) download_album(album_id_str)
elif playlist_id_str is not None: elif playlist_id_str is not None:
playlist_songs = get_playlist_songs(token, playlist_id_str) playlist_songs = get_playlist_songs(token, playlist_id_str)
name, creator = get_playlist_info(token, playlist_id_str) name, _ = get_playlist_info(token, playlist_id_str)
for song in playlist_songs: for song in playlist_songs:
download_track(song['track']['id'], download_track(song["track"]["id"],
sanitize_data(name) + "/") sanitize_data(name) + "/")
print("\n") print("\n")
elif episode_id_str is not None: elif episode_id_str is not None:
@ -164,37 +175,13 @@ def client():
elif show_id_str is not None: elif show_id_str is not None:
for episode in get_show_episodes(token, show_id_str): for episode in get_show_episodes(token, show_id_str):
download_episode(episode) download_episode(episode)
else:
else: search(search_text)
search_text = input("Enter search or URL: ") # wait()
track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str = regex_input_for_urls(
search_text)
if track_id_str is not None:
download_track(track_id_str)
elif artist_id_str is not None:
download_artist_albums(artist_id_str)
elif album_id_str is not None:
download_album(album_id_str)
elif playlist_id_str is not None:
playlist_songs = get_playlist_songs(token, playlist_id_str)
name, creator = get_playlist_info(token, playlist_id_str)
for song in playlist_songs:
download_track(song['track']['id'],
sanitize_data(name) + "/")
print("\n")
elif episode_id_str is not None:
download_episode(episode_id_str)
elif show_id_str is not None:
for episode in get_show_episodes(token, show_id_str):
download_episode(episode)
else:
search(search_text)
# wait()
def regex_input_for_urls(search_input): def regex_input_for_urls(search_input): # pylint: disable=too-many-locals
""" Since many kinds of search may be passed at the command line, process them all here. """
track_uri_search = re.search( track_uri_search = re.search(
r"^spotify:track:(?P<TrackID>[0-9a-zA-Z]{22})$", search_input) r"^spotify:track:(?P<TrackID>[0-9a-zA-Z]{22})$", search_input)
track_url_search = re.search( track_url_search = re.search(
@ -237,7 +224,6 @@ def regex_input_for_urls(search_input):
search_input, search_input,
) )
if track_uri_search is not None or track_url_search is not None: if track_uri_search is not None or track_url_search is not None:
track_id_str = (track_uri_search track_id_str = (track_uri_search
if track_uri_search is not None else if track_uri_search is not None else
@ -275,32 +261,31 @@ def regex_input_for_urls(search_input):
if artist_uri_search is not None or artist_url_search is not None: if artist_uri_search is not None or artist_url_search is not None:
artist_id_str = (artist_uri_search artist_id_str = (artist_uri_search
if artist_uri_search is not None else if artist_uri_search is not None else
artist_url_search).group("ArtistID") artist_url_search).group("ArtistID")
else: else:
artist_id_str = None artist_id_str = None
return track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str return track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str
def get_episode_info(episode_id_str): def get_episode_info(episode_id_str): # pylint: disable=missing-function-docstring
token = SESSION.tokens().get("user-read-email") token = SESSION.tokens().get("user-read-email")
info = json.loads(requests.get("https://api.spotify.com/v1/episodes/" + info = json.loads(requests.get(f"https://api.spotify.com/v1/episodes/{episode_id_str}",
episode_id_str, headers={"Authorization": "Bearer %s" % token}).text) headers={"Authorization": f"Bearer {token}"}).text)
if "error" in info: if "error" in info:
return None, None return None, None
else: # print(info["images"][0]["url"])
# print(info['images'][0]['url']) return sanitize_data(info["show"]["name"]), sanitize_data(info["name"])
return sanitize_data(info["show"]["name"]), sanitize_data(info["name"])
def get_show_episodes(access_token, show_id_str): def get_show_episodes(access_token, show_id_str): # pylint: disable=missing-function-docstring
episodes = [] episodes = []
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get( resp = requests.get(
f'https://api.spotify.com/v1/shows/{show_id_str}/episodes', headers=headers).json() f"https://api.spotify.com/v1/shows/{show_id_str}/episodes", headers=headers).json()
for episode in resp["items"]: for episode in resp["items"]:
episodes.append(episode["id"]) episodes.append(episode["id"])
@ -308,9 +293,7 @@ def get_show_episodes(access_token, show_id_str):
return episodes return episodes
def download_episode(episode_id_str): def download_episode(episode_id_str): # pylint: disable=missing-function-docstring
global ROOT_PODCAST_PATH, MUSIC_FORMAT
podcast_name, episode_name = get_episode_info(episode_id_str) podcast_name, episode_name = get_episode_info(episode_id_str)
extra_paths = podcast_name + "/" extra_paths = podcast_name + "/"
@ -326,28 +309,28 @@ def download_episode(episode_id_str):
# print("### DOWNLOADING '" + podcast_name + " - " + # print("### DOWNLOADING '" + podcast_name + " - " +
# episode_name + "' - THIS MAY TAKE A WHILE ###") # episode_name + "' - THIS MAY TAKE A WHILE ###")
if not os.path.isdir(ROOT_PODCAST_PATH + extra_paths): os.makedirs(ZS_CONFIG["ROOT_PODCAST_PATH"] +
os.makedirs(ROOT_PODCAST_PATH + extra_paths) extra_paths, exist_ok=True)
total_size = stream.input_stream.size total_size = stream.input_stream.size
with open(ROOT_PODCAST_PATH + extra_paths + filename + ".wav", 'wb') as file, tqdm( with open(ZS_CONFIG["ROOT_PODCAST_PATH"] + extra_paths + filename + ".wav", "wb") as file, tqdm(
desc=filename, desc=filename,
total=total_size, total=total_size,
unit='B', unit="B",
unit_scale=True, unit_scale=True,
unit_divisor=1024 unit_divisor=1024
) as bar: ) as p_bar:
for _ in range(int(total_size / CHUNK_SIZE) + 1): for _ in range(int(total_size / ZS_CONFIG["CHUNK_SIZE"]) + 1):
bar.update(file.write( p_bar.update(file.write(
stream.input_stream.stream().read(CHUNK_SIZE))) stream.input_stream.stream().read(ZS_CONFIG["CHUNK_SIZE"])))
# convert_audio_format(ROOT_PODCAST_PATH + # convert_audio_format(ZS_CONFIG["ROOT_PODCAST_PATH"] +
# extra_paths + filename + ".wav") # extra_paths + filename + ".wav")
# related functions that do stuff with the spotify API # related functions that do stuff with the spotify API
def search(search_term): def search(search_term): # pylint: disable=too-many-locals,too-many-branches
""" Searches Spotify's API for relevant data """ """ Searches Spotify's API for relevant data """
token = SESSION.tokens().get("user-read-email") token = SESSION.tokens().get("user-read-email")
@ -380,7 +363,7 @@ def search(search_term):
if split == "-t" or split == "-type": if split == "-t" or split == "-type":
allowed_types = ["track", "playlist", "album"] allowed_types = ["track", "playlist", "album", "artist"]
for i in range(index+1, len(splits)): for i in range(index+1, len(splits)):
if splits[i][0] == "-": if splits[i][0] == "-":
break break
@ -392,7 +375,7 @@ def search(search_term):
params["type"].add(splits[i]) params["type"].add(splits[i])
if len(params["type"]) == 0: if len(params["type"]) == 0:
params["type"] = {"track", "album", "playlist"} params["type"] = {"track", "album", "playlist", "artist"}
# Clean search term # Clean search term
search_term_list = [] search_term_list = []
@ -412,7 +395,7 @@ def search(search_term):
"q": params["q"], "q": params["q"],
"type": ",".join(params["type"]) "type": ",".join(params["type"])
}, },
headers={"Authorization": "Bearer %s" % token}, headers={"Authorization": f"Bearer {token}"},
) )
# print(resp.json()) # print(resp.json())
@ -508,6 +491,30 @@ def search(search_term):
else: else:
total_playlists = 0 total_playlists = 0
if "artist" in params["type"]:
artists = resp.json()["artists"]["items"]
if len(artists) > 0:
print("### ARTISTS ###")
for artist in artists:
# collect needed data
dic = {
"id" : artist["id"],
"name" : artist["name"],
"type" : "artist",
}
dics.append(dic)
print("{}, {}".format(
enum,
dic["name"],
))
enum += 1
total_artists = enum - total_tracks - total_albums - total_playlists - 1
print("\n")
else:
total_artists = 0
if total_tracks + total_albums + total_playlists == 0: if total_tracks + total_albums + total_playlists == 0:
print("NO RESULTS FOUND - EXITING...") print("NO RESULTS FOUND - EXITING...")
else: else:
@ -525,6 +532,9 @@ def search(search_term):
# if request is for album # if request is for album
if dic["type"] == "album": if dic["type"] == "album":
download_album(dic["id"]) download_album(dic["id"])
# if request is for artist
if dic["type"] == "artist":
download_artist_albums(dic["id"])
# if request is for playlist # if request is for playlist
if dic["type"] == "playlist": if dic["type"] == "playlist":
playlist_songs = get_playlist_songs(token, dic["id"]) playlist_songs = get_playlist_songs(token, dic["id"])
@ -540,53 +550,52 @@ def get_song_info(song_id):
token = SESSION.tokens().get("user-read-email") token = SESSION.tokens().get("user-read-email")
info = json.loads(requests.get("https://api.spotify.com/v1/tracks?ids=" + song_id + info = json.loads(requests.get("https://api.spotify.com/v1/tracks?ids=" + song_id +
'&market=from_token', headers={"Authorization": "Bearer %s" % token}).text) "&market=from_token", headers={"Authorization": f"Bearer {token}"}).text)
artists = [] artists = []
for data in info['tracks'][0]['artists']: for data in info["tracks"][0]["artists"]:
artists.append(sanitize_data(data['name'])) artists.append(sanitize_data(data["name"]))
album_name = sanitize_data(info['tracks'][0]['album']["name"]) album_name = sanitize_data(info["tracks"][0]["album"]["name"])
name = sanitize_data(info['tracks'][0]['name']) name = sanitize_data(info["tracks"][0]["name"])
image_url = info['tracks'][0]['album']['images'][0]['url'] image_url = info["tracks"][0]["album"]["images"][0]["url"]
release_year = info['tracks'][0]['album']['release_date'].split("-")[0] release_year = info["tracks"][0]["album"]["release_date"].split("-")[0]
disc_number = info['tracks'][0]['disc_number'] disc_number = info["tracks"][0]["disc_number"]
track_number = info['tracks'][0]['track_number'] track_number = info["tracks"][0]["track_number"]
scraped_song_id = info['tracks'][0]['id'] scraped_song_id = info["tracks"][0]["id"]
is_playable = info['tracks'][0]['is_playable'] is_playable = info["tracks"][0]["is_playable"]
return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable
def check_premium(): def check_premium():
""" If user has spotify premium return true """ """ If user has spotify premium return true """
global FORCE_PREMIUM return bool((SESSION.get_user_attribute("type") == "premium") or ZS_CONFIG["FORCE_PREMIUM"])
return bool((SESSION.get_user_attribute("type") == "premium") or FORCE_PREMIUM)
# Functions directly related to modifying the downloaded audio and its metadata # Functions directly related to modifying the downloaded audio and its metadata
def convert_audio_format(filename): def convert_audio_format(filename):
""" Converts raw audio into playable mp3 or ogg vorbis """ """ Converts raw audio into playable mp3 or ogg vorbis """
global MUSIC_FORMAT # print("### CONVERTING TO " + ZS_CONFIG["MUSIC_FORMAT"].upper() + " ###")
# print("### CONVERTING TO " + MUSIC_FORMAT.upper() + " ###")
raw_audio = AudioSegment.from_file(filename, format="ogg", raw_audio = AudioSegment.from_file(filename, format="ogg",
frame_rate=44100, channels=2, sample_width=2) frame_rate=44100, channels=2, sample_width=2)
if QUALITY == AudioQuality.VERY_HIGH: if QUALITY == AudioQuality.VERY_HIGH:
bitrate = "320k" bitrate = "320k"
else: else:
bitrate = "160k" bitrate = "160k"
raw_audio.export(filename, format=MUSIC_FORMAT, bitrate=bitrate) raw_audio.export(
filename, format=ZS_CONFIG["MUSIC_FORMAT"], bitrate=bitrate)
def set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number): def set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number): # pylint: disable=too-many-arguments
""" sets music_tag metadata """ """ sets music_tag metadata """
# print("### SETTING MUSIC TAGS ###") # print("### SETTING MUSIC TAGS ###")
tags = music_tag.load_file(filename) tags = music_tag.load_file(filename)
tags['artist'] = conv_artist_format(artists) tags["artist"] = conv_artist_format(artists)
tags['tracktitle'] = name tags["tracktitle"] = name
tags['album'] = album_name tags["album"] = album_name
tags['year'] = release_year tags["year"] = release_year
tags['discnumber'] = disc_number tags["discnumber"] = disc_number
tags['tracknumber'] = track_number tags["tracknumber"] = track_number
tags.save() tags.save()
@ -595,7 +604,7 @@ def set_music_thumbnail(filename, image_url):
# print("### SETTING THUMBNAIL ###") # print("### SETTING THUMBNAIL ###")
img = requests.get(image_url).content img = requests.get(image_url).content
tags = music_tag.load_file(filename) tags = music_tag.load_file(filename)
tags['artwork'] = img tags["artwork"] = img
tags.save() tags.save()
@ -615,14 +624,14 @@ def get_all_playlists(access_token):
offset = 0 offset = 0
while True: while True:
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
params = {'limit': limit, 'offset': offset} params = {"limit": limit, "offset": offset}
resp = requests.get("https://api.spotify.com/v1/me/playlists", resp = requests.get("https://api.spotify.com/v1/me/playlists",
headers=headers, params=params).json() headers=headers, params=params).json()
offset += limit offset += limit
playlists.extend(resp['items']) playlists.extend(resp["items"])
if len(resp['items']) < limit: if len(resp["items"]) < limit:
break break
return playlists return playlists
@ -635,14 +644,14 @@ def get_playlist_songs(access_token, playlist_id):
limit = 100 limit = 100
while True: while True:
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
params = {'limit': limit, 'offset': offset} params = {"limit": limit, "offset": offset}
resp = requests.get( resp = requests.get(
f'https://api.spotify.com/v1/playlists/{playlist_id}/tracks', headers=headers, params=params).json() f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks", headers=headers, params=params).json()
offset += limit offset += limit
songs.extend(resp['items']) songs.extend(resp["items"])
if len(resp['items']) < limit: if len(resp["items"]) < limit:
break break
return songs return songs
@ -650,10 +659,11 @@ def get_playlist_songs(access_token, playlist_id):
def get_playlist_info(access_token, playlist_id): def get_playlist_info(access_token, playlist_id):
""" Returns information scraped from playlist """ """ Returns information scraped from playlist """
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get( resp = requests.get(
f'https://api.spotify.com/v1/playlists/{playlist_id}?fields=name,owner(display_name)&market=from_token', headers=headers).json() f"https://api.spotify.com/v1/playlists/{playlist_id}?fields=name,owner(display_name)&market=from_token",
return resp['name'].strip(), resp['owner']['display_name'].strip() headers=headers).json()
return resp["name"].strip(), resp["owner"]["display_name"].strip()
# Extra functions directly related to spotify albums # Extra functions directly related to spotify albums
@ -664,14 +674,14 @@ def get_album_tracks(access_token, album_id):
limit = 50 limit = 50
while True: while True:
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
params = {'limit': limit, 'offset': offset} params = {"limit": limit, "offset": offset}
resp = requests.get( resp = requests.get(
f'https://api.spotify.com/v1/albums/{album_id}/tracks', headers=headers, params=params).json() f"https://api.spotify.com/v1/albums/{album_id}/tracks", headers=headers, params=params).json()
offset += limit offset += limit
songs.extend(resp['items']) songs.extend(resp["items"])
if len(resp['items']) < limit: if len(resp["items"]) < limit:
break break
return songs return songs
@ -679,21 +689,25 @@ def get_album_tracks(access_token, album_id):
def get_album_name(access_token, album_id): def get_album_name(access_token, album_id):
""" Returns album name """ """ Returns album name """
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get( resp = requests.get(
f'https://api.spotify.com/v1/albums/{album_id}', headers=headers).json() f"https://api.spotify.com/v1/albums/{album_id}", headers=headers).json()
return resp['artists'][0]['name'], sanitize_data(resp['name']) return resp["artists"][0]["name"], sanitize_data(resp["name"])
# Extra functions directly related to spotify artists # Extra functions directly related to spotify artists
def get_artist_albums(access_token, artist_id): def get_artist_albums(access_token, artist_id):
""" Returns artist's albums """ """ Returns artist's albums """
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get( resp = requests.get(
f'https://api.spotify.com/v1/artists/{artist_id}/albums', headers=headers).json() f"https://api.spotify.com/v1/artists/{artist_id}/albums", headers=headers).json()
# Return a list each album's id # Return a list each album's id
return [resp['items'][i]['id'] for i in range(len(resp['items']))] return [resp["items"][i]["id"] for i in range(len(resp["items"]))]
# Extra functions directly related to our saved tracks # Extra functions directly related to our saved tracks
def get_saved_tracks(access_token): def get_saved_tracks(access_token):
""" Returns user's saved tracks """ """ Returns user's saved tracks """
songs = [] songs = []
@ -701,39 +715,47 @@ def get_saved_tracks(access_token):
limit = 50 limit = 50
while True: while True:
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
params = {'limit': limit, 'offset': offset} params = {"limit": limit, "offset": offset}
resp = requests.get('https://api.spotify.com/v1/me/tracks', resp = requests.get("https://api.spotify.com/v1/me/tracks",
headers=headers, params=params).json() headers=headers, params=params).json()
offset += limit offset += limit
songs.extend(resp['items']) songs.extend(resp["items"])
if len(resp['items']) < limit: if len(resp["items"]) < limit:
break break
return songs return songs
# Functions directly related to downloading stuff # Functions directly related to downloading stuff
def download_track(track_id_str: str, extra_paths=""): def download_track(track_id_str: str, extra_paths="", prefix=False, prefix_value="", disable_progressbar=False): # pylint: disable=too-many-locals,too-many-branches
""" Downloads raw song audio from Spotify """ """ Downloads raw song audio from Spotify """
global ROOT_PATH, SKIP_EXISTING_FILES, MUSIC_FORMAT, RAW_AUDIO_AS_IS, ANTI_BAN_WAIT_TIME, OVERRIDE_AUTO_WAIT
try: try:
artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable = get_song_info( artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable = get_song_info(
track_id_str) track_id_str)
song_name = artists[0] + " - " + name song_name = artists[0] + " - " + name
filename = ROOT_PATH + extra_paths + song_name + '.' + MUSIC_FORMAT if prefix:
except Exception as e: song_name = f"{prefix_value.zfill(2)} - {song_name}" if prefix_value.isdigit(
) else f"{prefix_value} - {song_name}"
if ZS_CONFIG["SPLIT_ALBUM_DISCS"]:
filename = os.path.join(ZS_CONFIG["ROOT_PATH"], extra_paths, "Disc " + str(
disc_number) + "/" + song_name + "." + ZS_CONFIG["MUSIC_FORMAT"])
else:
filename = os.path.join(ZS_CONFIG["ROOT_PATH"], extra_paths,
song_name + "." + ZS_CONFIG["MUSIC_FORMAT"])
except Exception as err: # pylint: disable=broad-except,unused-variable
print("### SKIPPING SONG - FAILED TO QUERY METADATA ###") print("### SKIPPING SONG - FAILED TO QUERY METADATA ###")
# print(e) # print(err)
else: else:
try: try:
if not is_playable: if not is_playable:
print("### SKIPPING:", song_name, print("### SKIPPING:", song_name,
"(SONG IS UNAVAILABLE) ###") "(SONG IS UNAVAILABLE) ###")
else: else:
if os.path.isfile(filename) and os.path.getsize(filename) and SKIP_EXISTING_FILES: if os.path.isfile(filename) and os.path.getsize(filename) and ZS_CONFIG["SKIP_EXISTING_FILES"]:
print("### SKIPPING:", song_name, print("### SKIPPING:", song_name,
"(SONG ALREADY EXISTS) ###") "(SONG ALREADY EXISTS) ###")
else: else:
@ -747,30 +769,35 @@ def download_track(track_id_str: str, extra_paths=""):
track_id, VorbisOnlyAudioQuality(QUALITY), False, None) track_id, VorbisOnlyAudioQuality(QUALITY), False, None)
# print("### DOWNLOADING RAW AUDIO ###") # print("### DOWNLOADING RAW AUDIO ###")
if not os.path.isdir(ROOT_PATH + extra_paths): if ZS_CONFIG["SPLIT_ALBUM_DISCS"]:
os.makedirs(ROOT_PATH + extra_paths) os.makedirs(
ZS_CONFIG["ROOT_PATH"] + extra_paths + "/Disc " + str(disc_number) + "/", exist_ok=True)
else:
os.makedirs(ZS_CONFIG["ROOT_PATH"] +
extra_paths, exist_ok=True)
total_size = stream.input_stream.size total_size = stream.input_stream.size
with open(filename, 'wb') as file, tqdm( with open(filename, "wb") as file, tqdm(
desc=song_name, desc=song_name,
total=total_size, total=total_size,
unit='B', unit="B",
unit_scale=True, unit_scale=True,
unit_divisor=1024 unit_divisor=1024,
) as bar: disable=disable_progressbar
for _ in range(int(total_size / CHUNK_SIZE) + 1): ) as p_bar:
bar.update(file.write( for _ in range(int(total_size / ZS_CONFIG["CHUNK_SIZE"]) + 1):
stream.input_stream.stream().read(CHUNK_SIZE))) p_bar.update(file.write(
stream.input_stream.stream().read(ZS_CONFIG["CHUNK_SIZE"])))
if not RAW_AUDIO_AS_IS: if not ZS_CONFIG["RAW_AUDIO_AS_IS"]:
convert_audio_format(filename) convert_audio_format(filename)
set_audio_tags(filename, artists, name, album_name, set_audio_tags(filename, artists, name, album_name,
release_year, disc_number, track_number) release_year, disc_number, track_number)
set_music_thumbnail(filename, image_url) set_music_thumbnail(filename, image_url)
if not OVERRIDE_AUTO_WAIT: if not ZS_CONFIG["OVERRIDE_AUTO_WAIT"]:
time.sleep(ANTI_BAN_WAIT_TIME) time.sleep(ZS_CONFIG["ANTI_BAN_WAIT_TIME"])
except: except Exception: # pylint: disable=broad-except
print("### SKIPPING:", song_name, print("### SKIPPING:", song_name,
"(GENERAL DOWNLOAD ERROR) ###") "(GENERAL DOWNLOAD ERROR) ###")
if os.path.exists(filename): if os.path.exists(filename):
@ -782,9 +809,10 @@ def download_album(album):
token = SESSION.tokens().get("user-read-email") token = SESSION.tokens().get("user-read-email")
artist, album_name = get_album_name(token, album) artist, album_name = get_album_name(token, album)
tracks = get_album_tracks(token, album) tracks = get_album_tracks(token, album)
for track in tracks: for num, track in tqdm(enumerate(tracks, start=1), unit_scale=True, unit="Song", total=len(tracks)):
download_track(track['id'], artist + " - " + album_name + "/") download_track(track["id"], f"{artist}/{album_name}",
print("\n") prefix=True, prefix_value=str(num), disable_progressbar=True)
def download_artist_albums(artist): def download_artist_albums(artist):
""" Downloads albums of an artist """ """ Downloads albums of an artist """
@ -793,17 +821,18 @@ def download_artist_albums(artist):
for album_id in albums: for album_id in albums:
download_album(album_id) download_album(album_id)
def download_playlist(playlists, playlist_choice): def download_playlist(playlists, playlist_choice):
"""Downloads all the songs from a playlist""" """Downloads all the songs from a playlist"""
token = SESSION.tokens().get("user-read-email") token = SESSION.tokens().get("user-read-email")
playlist_songs = get_playlist_songs( playlist_songs = get_playlist_songs(
token, playlists[int(playlist_choice) - 1]['id']) token, playlists[int(playlist_choice) - 1]["id"])
for song in playlist_songs: for song in playlist_songs:
if song['track']['id'] is not None: if song["track"]["id"] is not None:
download_track(song['track']['id'], sanitize_data( download_track(song["track"]["id"], sanitize_data(
playlists[int(playlist_choice) - 1]['name'].strip()) + "/") playlists[int(playlist_choice) - 1]["name"].strip()) + "/")
print("\n") print("\n")
@ -814,7 +843,7 @@ def download_from_user_playlist():
count = 1 count = 1
for playlist in playlists: for playlist in playlists:
print(str(count) + ": " + playlist['name'].strip()) print(str(count) + ": " + playlist["name"].strip())
count += 1 count += 1
print("\n> SELECT A PLAYLIST BY ID") print("\n> SELECT A PLAYLIST BY ID")
@ -827,7 +856,7 @@ def download_from_user_playlist():
download_playlist(playlists, playlist_choices[0]) download_playlist(playlists, playlist_choices[0])
else: else:
start = int(playlist_choices[0]) start = int(playlist_choices[0])
end = int(playlist_choices[1])+1 end = int(playlist_choices[1]) + 1
print(f"Downloading from {start} to {end}...") print(f"Downloading from {start} to {end}...")
@ -840,10 +869,9 @@ def download_from_user_playlist():
# Core functions here # Core functions here
def check_raw(): def check_raw(): # pylint: disable=missing-function-docstring
global RAW_AUDIO_AS_IS, MUSIC_FORMAT if ZS_CONFIG["RAW_AUDIO_AS_IS"]:
if RAW_AUDIO_AS_IS: ZS_CONFIG["MUSIC_FORMAT"] = "wav"
MUSIC_FORMAT = "wav"
def main(): def main():