Merge pull request #66 from reduxionist/chore/add-lint-request-to-readme-and-lint-code

Lints the code and requests that contributors do the same.
This commit is contained in:
Logykk 2021-10-23 21:41:35 +13:00 committed by GitHub
commit 632320d18f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 116 additions and 127 deletions

View File

@ -52,9 +52,12 @@ Options that can be configured in zs_config.json:
``` ```
### Will my account get banned if I use this tool? ### Will my account get banned if I use this tool?
Currently no user has reported their account getting banned after using ZSpotify. Currently no user has reported their account getting banned after using ZSpotify.
This isn't to say _you_ won't get banned as it is technically againt Spotify's TOS. This isn't to say _you_ won't get banned as it is technically against Spotify's TOS.
**Use ZSpotify at your own risk**, the developers of ZSpotify are not responsible if your account gets banned. **Use ZSpotify at your own risk**, the developers of ZSpotify are not responsible if your account gets banned.
### Contributing
Please be sure to lint your code with pylint before issuing a pull-request, thanks!
## **Changelog:** ## **Changelog:**
**v2.1 (23 Oct 2021):** **v2.1 (23 Oct 2021):**
- Moved configuration from hard-coded values to separate zs_config.json file. - Moved configuration from hard-coded values to separate zs_config.json file.

View File

@ -7,6 +7,7 @@ It's like youtube-dl, but for Spotify.
(Made by Deathmonger/Footsiefat - @doomslayer117:matrix.org) (Made by Deathmonger/Footsiefat - @doomslayer117:matrix.org)
""" """
from getpass import getpass
import json import json
import os import os
import os.path import os.path
@ -14,21 +15,21 @@ import platform
import re import re
import sys import sys
import time import time
from getpass import getpass
import music_tag
import requests
from librespot.audio.decoders import AudioQuality, VorbisOnlyAudioQuality from librespot.audio.decoders import AudioQuality, VorbisOnlyAudioQuality
from librespot.core import Session from librespot.core import Session
from librespot.metadata import TrackId, EpisodeId from librespot.metadata import TrackId, EpisodeId
import music_tag
from pydub import AudioSegment from pydub import AudioSegment
import requests
from tqdm import tqdm from tqdm import tqdm
QUALITY = None
SESSION: Session = None SESSION: Session = None
sanitize = ["\\", "/", ":", "*", "?", "'", "<", ">", '"'] SANITIZE = ["\\", "/", ":", "*", "?", "'", "<", ">", "\""]
# user-customizable variables that adjust the core functionality of ZSpotify # user-customizable variables that adjust the core functionality of ZSpotify
with open('zs_config.json') as config_file: with open("zs_config.json", encoding="utf-8") as config_file:
ZS_CONFIG = json.load(config_file) ZS_CONFIG = json.load(config_file)
@ -46,14 +47,13 @@ def clear():
def wait(seconds: int = 3): def wait(seconds: int = 3):
""" Pause for a set number of seconds """ """ Pause for a set number of seconds """
for i in range(seconds)[::-1]: for i in range(seconds)[::-1]:
print("\rWait for %d second(s)..." % (i + 1), end="") print(f"\rWait for {i + 1} second(s)...", end="")
time.sleep(1) time.sleep(1)
def sanitize_data(value): def sanitize_data(value):
""" Returns given string with problematic removed """ """ Returns given string with problematic removed """
global sanitize for i in SANITIZE:
for i in sanitize:
value = value.replace(i, "") value = value.replace(i, "")
return value.replace("|", "-") return value.replace("|", "-")
@ -64,12 +64,11 @@ def split_input(selection):
if "-" in selection: if "-" in selection:
for number in range(int(selection.split("-")[0]), int(selection.split("-")[1]) + 1): for number in range(int(selection.split("-")[0]), int(selection.split("-")[1]) + 1):
inputs.append(number) inputs.append(number)
return inputs
else: else:
selections = selection.split(",") selections = selection.split(",")
for i in selections: for i in selections:
inputs.append(i.strip()) inputs.append(i.strip())
return inputs return inputs
def splash(): def splash():
@ -86,7 +85,7 @@ def splash():
# two mains functions for logging in and doing client stuff # two mains functions for logging in and doing client stuff
def login(): def login():
""" Authenticates with Spotify and saves credentials to a file """ """ Authenticates with Spotify and saves credentials to a file """
global SESSION global SESSION # pylint: disable=global-statement
if os.path.isfile("credentials.json"): if os.path.isfile("credentials.json"):
try: try:
@ -104,9 +103,9 @@ def login():
pass pass
def client(): def client(): # pylint: disable=too-many-branches,too-many-statements
""" Connects to spotify to perform query's and get songs to download """ """ Connects to spotify to perform query's and get songs to download """
global QUALITY, SESSION global QUALITY # pylint: disable=global-statement
splash() splash()
token = SESSION.tokens().get("user-read-email") token = SESSION.tokens().get("user-read-email")
@ -124,11 +123,11 @@ def client():
download_from_user_playlist() download_from_user_playlist()
elif sys.argv[1] == "-ls" or sys.argv[1] == "--liked-songs": elif sys.argv[1] == "-ls" or sys.argv[1] == "--liked-songs":
for song in get_saved_tracks(token): for song in get_saved_tracks(token):
if not song['track']['name']: if not song["track"]["name"]:
print( print(
"### SKIPPING: SONG DOES NOT EXISTS ON SPOTIFY ANYMORE ###") "### SKIPPING: SONG DOES NOT EXISTS ON SPOTIFY ANYMORE ###")
else: else:
download_track(song['track']['id'], "Liked Songs/") download_track(song["track"]["id"], "Liked Songs/")
print("\n") print("\n")
else: else:
track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str = regex_input_for_urls( track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str = regex_input_for_urls(
@ -142,9 +141,9 @@ def client():
download_album(album_id_str) download_album(album_id_str)
elif playlist_id_str is not None: elif playlist_id_str is not None:
playlist_songs = get_playlist_songs(token, playlist_id_str) playlist_songs = get_playlist_songs(token, playlist_id_str)
name, creator = get_playlist_info(token, playlist_id_str) name, _ = get_playlist_info(token, playlist_id_str)
for song in playlist_songs: for song in playlist_songs:
download_track(song['track']['id'], download_track(song["track"]["id"],
sanitize_data(name) + "/") sanitize_data(name) + "/")
print("\n") print("\n")
elif episode_id_str is not None: elif episode_id_str is not None:
@ -152,7 +151,6 @@ def client():
elif show_id_str is not None: elif show_id_str is not None:
for episode in get_show_episodes(token, show_id_str): for episode in get_show_episodes(token, show_id_str):
download_episode(episode) download_episode(episode)
else: else:
search_text = input("Enter search or URL: ") search_text = input("Enter search or URL: ")
@ -167,9 +165,9 @@ def client():
download_album(album_id_str) download_album(album_id_str)
elif playlist_id_str is not None: elif playlist_id_str is not None:
playlist_songs = get_playlist_songs(token, playlist_id_str) playlist_songs = get_playlist_songs(token, playlist_id_str)
name, creator = get_playlist_info(token, playlist_id_str) name, _ = get_playlist_info(token, playlist_id_str)
for song in playlist_songs: for song in playlist_songs:
download_track(song['track']['id'], download_track(song["track"]["id"],
sanitize_data(name) + "/") sanitize_data(name) + "/")
print("\n") print("\n")
elif episode_id_str is not None: elif episode_id_str is not None:
@ -182,7 +180,8 @@ def client():
# wait() # wait()
def regex_input_for_urls(search_input): def regex_input_for_urls(search_input): # pylint: disable=too-many-locals
""" Since many kinds of search may be passed at the command line, process them all here. """
track_uri_search = re.search( track_uri_search = re.search(
r"^spotify:track:(?P<TrackID>[0-9a-zA-Z]{22})$", search_input) r"^spotify:track:(?P<TrackID>[0-9a-zA-Z]{22})$", search_input)
track_url_search = re.search( track_url_search = re.search(
@ -270,24 +269,23 @@ def regex_input_for_urls(search_input):
return track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str return track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str
def get_episode_info(episode_id_str): def get_episode_info(episode_id_str): # pylint: disable=missing-function-docstring
token = SESSION.tokens().get("user-read-email") token = SESSION.tokens().get("user-read-email")
info = json.loads(requests.get("https://api.spotify.com/v1/episodes/" + info = json.loads(requests.get(f"https://api.spotify.com/v1/episodes/{episode_id_str}",
episode_id_str, headers={"Authorization": "Bearer %s" % token}).text) headers={"Authorization": f"Bearer {token}"}).text)
if "error" in info: if "error" in info:
return None, None return None, None
else: # print(info["images"][0]["url"])
# print(info['images'][0]['url']) return sanitize_data(info["show"]["name"]), sanitize_data(info["name"])
return sanitize_data(info["show"]["name"]), sanitize_data(info["name"])
def get_show_episodes(access_token, show_id_str): def get_show_episodes(access_token, show_id_str): # pylint: disable=missing-function-docstring
episodes = [] episodes = []
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get( resp = requests.get(
f'https://api.spotify.com/v1/shows/{show_id_str}/episodes', headers=headers).json() f"https://api.spotify.com/v1/shows/{show_id_str}/episodes", headers=headers).json()
for episode in resp["items"]: for episode in resp["items"]:
episodes.append(episode["id"]) episodes.append(episode["id"])
@ -295,7 +293,7 @@ def get_show_episodes(access_token, show_id_str):
return episodes return episodes
def download_episode(episode_id_str): def download_episode(episode_id_str): # pylint: disable=missing-function-docstring
podcast_name, episode_name = get_episode_info(episode_id_str) podcast_name, episode_name = get_episode_info(episode_id_str)
extra_paths = podcast_name + "/" extra_paths = podcast_name + "/"
@ -315,15 +313,15 @@ def download_episode(episode_id_str):
extra_paths, exist_ok=True) extra_paths, exist_ok=True)
total_size = stream.input_stream.size total_size = stream.input_stream.size
with open(ZS_CONFIG["ROOT_PODCAST_PATH"] + extra_paths + filename + ".wav", 'wb') as file, tqdm( with open(ZS_CONFIG["ROOT_PODCAST_PATH"] + extra_paths + filename + ".wav", "wb") as file, tqdm(
desc=filename, desc=filename,
total=total_size, total=total_size,
unit='B', unit="B",
unit_scale=True, unit_scale=True,
unit_divisor=1024 unit_divisor=1024
) as bar: ) as p_bar:
for _ in range(int(total_size / ZS_CONFIG["CHUNK_SIZE"]) + 1): for _ in range(int(total_size / ZS_CONFIG["CHUNK_SIZE"]) + 1):
bar.update(file.write( p_bar.update(file.write(
stream.input_stream.stream().read(ZS_CONFIG["CHUNK_SIZE"]))) stream.input_stream.stream().read(ZS_CONFIG["CHUNK_SIZE"])))
# convert_audio_format(ZS_CONFIG["ROOT_PODCAST_PATH"] + # convert_audio_format(ZS_CONFIG["ROOT_PODCAST_PATH"] +
@ -332,7 +330,7 @@ def download_episode(episode_id_str):
# related functions that do stuff with the spotify API # related functions that do stuff with the spotify API
def search(search_term): def search(search_term): # pylint: disable=too-many-locals,too-many-branches
""" Searches Spotify's API for relevant data """ """ Searches Spotify's API for relevant data """
token = SESSION.tokens().get("user-read-email") token = SESSION.tokens().get("user-read-email")
@ -344,7 +342,7 @@ def search(search_term):
"q": search_term, "q": search_term,
"type": "track,album,artist,playlist" "type": "track,album,artist,playlist"
}, },
headers={"Authorization": "Bearer %s" % token}, headers={"Authorization": f"Bearer {token}"},
) )
# print(resp.json()) # print(resp.json())
@ -358,12 +356,7 @@ def search(search_term):
explicit = "[E]" explicit = "[E]"
else: else:
explicit = "" explicit = ""
print("%d, %s %s | %s" % ( print(f"{i}, {track['name']} {explicit} | {','.join([artist['name'] for artist in track['artists']])}")
i,
track["name"],
explicit,
",".join([artist["name"] for artist in track["artists"]]),
))
i += 1 i += 1
total_tracks = i - 1 total_tracks = i - 1
print("\n") print("\n")
@ -374,11 +367,7 @@ def search(search_term):
if len(albums) > 0: if len(albums) > 0:
print("### ALBUMS ###") print("### ALBUMS ###")
for album in albums: for album in albums:
print("%d, %s | %s" % ( print(f"{i}, {album['name']} | {','.join([artist['name'] for artist in album['artists']])}")
i,
album["name"],
",".join([artist["name"] for artist in album["artists"]]),
))
i += 1 i += 1
total_albums = i - total_tracks - 1 total_albums = i - total_tracks - 1
print("\n") print("\n")
@ -402,11 +391,7 @@ def search(search_term):
playlists = resp.json()["playlists"]["items"] playlists = resp.json()["playlists"]["items"]
print("### PLAYLISTS ###") print("### PLAYLISTS ###")
for playlist in playlists: for playlist in playlists:
print("%d, %s | %s" % ( print(f"{i}, {playlist['name']} | {playlist['owner']['display_name']}")
i,
playlist["name"],
playlist['owner']['display_name'],
))
i += 1 i += 1
print("\n") print("\n")
@ -428,11 +413,11 @@ def search(search_term):
playlist_choice = playlists[position - playlist_choice = playlists[position -
total_tracks - total_albums - total_artists - 1] total_tracks - total_albums - total_artists - 1]
playlist_songs = get_playlist_songs( playlist_songs = get_playlist_songs(
token, playlist_choice['id']) token, playlist_choice["id"])
for song in playlist_songs: for song in playlist_songs:
if song['track']['id'] is not None: if song["track"]["id"] is not None:
download_track(song['track']['id'], sanitize_data( download_track(song["track"]["id"], sanitize_data(
playlist_choice['name'].strip()) + "/") playlist_choice["name"].strip()) + "/")
print("\n") print("\n")
@ -441,19 +426,19 @@ def get_song_info(song_id):
token = SESSION.tokens().get("user-read-email") token = SESSION.tokens().get("user-read-email")
info = json.loads(requests.get("https://api.spotify.com/v1/tracks?ids=" + song_id + info = json.loads(requests.get("https://api.spotify.com/v1/tracks?ids=" + song_id +
'&market=from_token', headers={"Authorization": "Bearer %s" % token}).text) "&market=from_token", headers={"Authorization": f"Bearer {token}"}).text)
artists = [] artists = []
for data in info['tracks'][0]['artists']: for data in info["tracks"][0]["artists"]:
artists.append(sanitize_data(data['name'])) artists.append(sanitize_data(data["name"]))
album_name = sanitize_data(info['tracks'][0]['album']["name"]) album_name = sanitize_data(info["tracks"][0]["album"]["name"])
name = sanitize_data(info['tracks'][0]['name']) name = sanitize_data(info["tracks"][0]["name"])
image_url = info['tracks'][0]['album']['images'][0]['url'] image_url = info["tracks"][0]["album"]["images"][0]["url"]
release_year = info['tracks'][0]['album']['release_date'].split("-")[0] release_year = info["tracks"][0]["album"]["release_date"].split("-")[0]
disc_number = info['tracks'][0]['disc_number'] disc_number = info["tracks"][0]["disc_number"]
track_number = info['tracks'][0]['track_number'] track_number = info["tracks"][0]["track_number"]
scraped_song_id = info['tracks'][0]['id'] scraped_song_id = info["tracks"][0]["id"]
is_playable = info['tracks'][0]['is_playable'] is_playable = info["tracks"][0]["is_playable"]
return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable
@ -477,16 +462,16 @@ def convert_audio_format(filename):
filename, format=ZS_CONFIG["MUSIC_FORMAT"], bitrate=bitrate) filename, format=ZS_CONFIG["MUSIC_FORMAT"], bitrate=bitrate)
def set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number): def set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number): # pylint: disable=too-many-arguments
""" sets music_tag metadata """ """ sets music_tag metadata """
# print("### SETTING MUSIC TAGS ###") # print("### SETTING MUSIC TAGS ###")
tags = music_tag.load_file(filename) tags = music_tag.load_file(filename)
tags['artist'] = conv_artist_format(artists) tags["artist"] = conv_artist_format(artists)
tags['tracktitle'] = name tags["tracktitle"] = name
tags['album'] = album_name tags["album"] = album_name
tags['year'] = release_year tags["year"] = release_year
tags['discnumber'] = disc_number tags["discnumber"] = disc_number
tags['tracknumber'] = track_number tags["tracknumber"] = track_number
tags.save() tags.save()
@ -495,7 +480,7 @@ def set_music_thumbnail(filename, image_url):
# print("### SETTING THUMBNAIL ###") # print("### SETTING THUMBNAIL ###")
img = requests.get(image_url).content img = requests.get(image_url).content
tags = music_tag.load_file(filename) tags = music_tag.load_file(filename)
tags['artwork'] = img tags["artwork"] = img
tags.save() tags.save()
@ -515,14 +500,14 @@ def get_all_playlists(access_token):
offset = 0 offset = 0
while True: while True:
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
params = {'limit': limit, 'offset': offset} params = {"limit": limit, "offset": offset}
resp = requests.get("https://api.spotify.com/v1/me/playlists", resp = requests.get("https://api.spotify.com/v1/me/playlists",
headers=headers, params=params).json() headers=headers, params=params).json()
offset += limit offset += limit
playlists.extend(resp['items']) playlists.extend(resp["items"])
if len(resp['items']) < limit: if len(resp["items"]) < limit:
break break
return playlists return playlists
@ -535,14 +520,14 @@ def get_playlist_songs(access_token, playlist_id):
limit = 100 limit = 100
while True: while True:
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
params = {'limit': limit, 'offset': offset} params = {"limit": limit, "offset": offset}
resp = requests.get( resp = requests.get(
f'https://api.spotify.com/v1/playlists/{playlist_id}/tracks', headers=headers, params=params).json() f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks", headers=headers, params=params).json()
offset += limit offset += limit
songs.extend(resp['items']) songs.extend(resp["items"])
if len(resp['items']) < limit: if len(resp["items"]) < limit:
break break
return songs return songs
@ -550,10 +535,11 @@ def get_playlist_songs(access_token, playlist_id):
def get_playlist_info(access_token, playlist_id): def get_playlist_info(access_token, playlist_id):
""" Returns information scraped from playlist """ """ Returns information scraped from playlist """
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get( resp = requests.get(
f'https://api.spotify.com/v1/playlists/{playlist_id}?fields=name,owner(display_name)&market=from_token', headers=headers).json() f"https://api.spotify.com/v1/playlists/{playlist_id}?fields=name,owner(display_name)&market=from_token",
return resp['name'].strip(), resp['owner']['display_name'].strip() headers=headers).json()
return resp["name"].strip(), resp["owner"]["display_name"].strip()
# Extra functions directly related to spotify albums # Extra functions directly related to spotify albums
@ -564,14 +550,14 @@ def get_album_tracks(access_token, album_id):
limit = 50 limit = 50
while True: while True:
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
params = {'limit': limit, 'offset': offset} params = {"limit": limit, "offset": offset}
resp = requests.get( resp = requests.get(
f'https://api.spotify.com/v1/albums/{album_id}/tracks', headers=headers, params=params).json() f"https://api.spotify.com/v1/albums/{album_id}/tracks", headers=headers, params=params).json()
offset += limit offset += limit
songs.extend(resp['items']) songs.extend(resp["items"])
if len(resp['items']) < limit: if len(resp["items"]) < limit:
break break
return songs return songs
@ -579,21 +565,21 @@ def get_album_tracks(access_token, album_id):
def get_album_name(access_token, album_id): def get_album_name(access_token, album_id):
""" Returns album name """ """ Returns album name """
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get( resp = requests.get(
f'https://api.spotify.com/v1/albums/{album_id}', headers=headers).json() f"https://api.spotify.com/v1/albums/{album_id}", headers=headers).json()
return resp['artists'][0]['name'], sanitize_data(resp['name']) return resp["artists"][0]["name"], sanitize_data(resp["name"])
# Extra functions directly related to spotify artists # Extra functions directly related to spotify artists
def get_artist_albums(access_token, artist_id): def get_artist_albums(access_token, artist_id):
""" Returns artist's albums """ """ Returns artist's albums """
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get( resp = requests.get(
f'https://api.spotify.com/v1/artists/{artist_id}/albums', headers=headers).json() f"https://api.spotify.com/v1/artists/{artist_id}/albums", headers=headers).json()
# Return a list each album's id # Return a list each album's id
return [resp['items'][i]['id'] for i in range(len(resp['items']))] return [resp["items"][i]["id"] for i in range(len(resp["items"]))]
# Extra functions directly related to our saved tracks # Extra functions directly related to our saved tracks
@ -605,21 +591,21 @@ def get_saved_tracks(access_token):
limit = 50 limit = 50
while True: while True:
headers = {'Authorization': f'Bearer {access_token}'} headers = {"Authorization": f"Bearer {access_token}"}
params = {'limit': limit, 'offset': offset} params = {"limit": limit, "offset": offset}
resp = requests.get('https://api.spotify.com/v1/me/tracks', resp = requests.get("https://api.spotify.com/v1/me/tracks",
headers=headers, params=params).json() headers=headers, params=params).json()
offset += limit offset += limit
songs.extend(resp['items']) songs.extend(resp["items"])
if len(resp['items']) < limit: if len(resp["items"]) < limit:
break break
return songs return songs
# Functions directly related to downloading stuff # Functions directly related to downloading stuff
def download_track(track_id_str: str, extra_paths="", prefix=False, prefix_value='', disable_progressbar=False): def download_track(track_id_str: str, extra_paths="", prefix=False, prefix_value="", disable_progressbar=False): # pylint: disable=too-many-locals,too-many-branches
""" Downloads raw song audio from Spotify """ """ Downloads raw song audio from Spotify """
try: try:
artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable = get_song_info( artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable = get_song_info(
@ -627,18 +613,18 @@ def download_track(track_id_str: str, extra_paths="", prefix=False, prefix_value
song_name = artists[0] + " - " + name song_name = artists[0] + " - " + name
if prefix: if prefix:
song_name = f'{prefix_value.zfill(2)} - {song_name}' if prefix_value.isdigit( song_name = f"{prefix_value.zfill(2)} - {song_name}" if prefix_value.isdigit(
) else f'{prefix_value} - {song_name}' ) else f"{prefix_value} - {song_name}"
if ZS_CONFIG["SPLIT_ALBUM_DISCS"]: if ZS_CONFIG["SPLIT_ALBUM_DISCS"]:
filename = os.path.join(ZS_CONFIG["ROOT_PATH"], extra_paths, "Disc " + str( filename = os.path.join(ZS_CONFIG["ROOT_PATH"], extra_paths, "Disc " + str(
disc_number) + '/' + song_name + '.' + ZS_CONFIG["MUSIC_FORMAT"]) disc_number) + "/" + song_name + "." + ZS_CONFIG["MUSIC_FORMAT"])
else: else:
filename = os.path.join(ZS_CONFIG["ROOT_PATH"], extra_paths, filename = os.path.join(ZS_CONFIG["ROOT_PATH"], extra_paths,
song_name + '.' + ZS_CONFIG["MUSIC_FORMAT"]) song_name + "." + ZS_CONFIG["MUSIC_FORMAT"])
except Exception as e: except Exception as err: # pylint: disable=broad-except,unused-variable
print("### SKIPPING SONG - FAILED TO QUERY METADATA ###") print("### SKIPPING SONG - FAILED TO QUERY METADATA ###")
# print(e) # print(err)
else: else:
try: try:
if not is_playable: if not is_playable:
@ -661,22 +647,22 @@ def download_track(track_id_str: str, extra_paths="", prefix=False, prefix_value
if ZS_CONFIG["SPLIT_ALBUM_DISCS"]: if ZS_CONFIG["SPLIT_ALBUM_DISCS"]:
os.makedirs( os.makedirs(
ZS_CONFIG["ROOT_PATH"] + extra_paths + "/Disc " + str(disc_number) + '/', exist_ok=True) ZS_CONFIG["ROOT_PATH"] + extra_paths + "/Disc " + str(disc_number) + "/", exist_ok=True)
else: else:
os.makedirs(ZS_CONFIG["ROOT_PATH"] + os.makedirs(ZS_CONFIG["ROOT_PATH"] +
extra_paths, exist_ok=True) extra_paths, exist_ok=True)
total_size = stream.input_stream.size total_size = stream.input_stream.size
with open(filename, 'wb') as file, tqdm( with open(filename, "wb") as file, tqdm(
desc=song_name, desc=song_name,
total=total_size, total=total_size,
unit='B', unit="B",
unit_scale=True, unit_scale=True,
unit_divisor=1024, unit_divisor=1024,
disable=disable_progressbar disable=disable_progressbar
) as bar: ) as p_bar:
for _ in range(int(total_size / ZS_CONFIG["CHUNK_SIZE"]) + 1): for _ in range(int(total_size / ZS_CONFIG["CHUNK_SIZE"]) + 1):
bar.update(file.write( p_bar.update(file.write(
stream.input_stream.stream().read(ZS_CONFIG["CHUNK_SIZE"]))) stream.input_stream.stream().read(ZS_CONFIG["CHUNK_SIZE"])))
if not ZS_CONFIG["RAW_AUDIO_AS_IS"]: if not ZS_CONFIG["RAW_AUDIO_AS_IS"]:
@ -687,7 +673,7 @@ def download_track(track_id_str: str, extra_paths="", prefix=False, prefix_value
if not ZS_CONFIG["OVERRIDE_AUTO_WAIT"]: if not ZS_CONFIG["OVERRIDE_AUTO_WAIT"]:
time.sleep(ZS_CONFIG["ANTI_BAN_WAIT_TIME"]) time.sleep(ZS_CONFIG["ANTI_BAN_WAIT_TIME"])
except: except Exception: # pylint: disable=broad-except
print("### SKIPPING:", song_name, print("### SKIPPING:", song_name,
"(GENERAL DOWNLOAD ERROR) ###") "(GENERAL DOWNLOAD ERROR) ###")
if os.path.exists(filename): if os.path.exists(filename):
@ -699,9 +685,9 @@ def download_album(album):
token = SESSION.tokens().get("user-read-email") token = SESSION.tokens().get("user-read-email")
artist, album_name = get_album_name(token, album) artist, album_name = get_album_name(token, album)
tracks = get_album_tracks(token, album) tracks = get_album_tracks(token, album)
for n, track in tqdm(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)): for num, track in tqdm(enumerate(tracks, start=1), unit_scale=True, unit="Song", total=len(tracks)):
download_track(track['id'], f'{artist}/{album_name}', download_track(track["id"], f"{artist}/{album_name}",
prefix=True, prefix_value=str(n), disable_progressbar=True) prefix=True, prefix_value=str(num), disable_progressbar=True)
def download_artist_albums(artist): def download_artist_albums(artist):
@ -717,12 +703,12 @@ def download_playlist(playlists, playlist_choice):
token = SESSION.tokens().get("user-read-email") token = SESSION.tokens().get("user-read-email")
playlist_songs = get_playlist_songs( playlist_songs = get_playlist_songs(
token, playlists[int(playlist_choice) - 1]['id']) token, playlists[int(playlist_choice) - 1]["id"])
for song in playlist_songs: for song in playlist_songs:
if song['track']['id'] is not None: if song["track"]["id"] is not None:
download_track(song['track']['id'], sanitize_data( download_track(song["track"]["id"], sanitize_data(
playlists[int(playlist_choice) - 1]['name'].strip()) + "/") playlists[int(playlist_choice) - 1]["name"].strip()) + "/")
print("\n") print("\n")
@ -733,7 +719,7 @@ def download_from_user_playlist():
count = 1 count = 1
for playlist in playlists: for playlist in playlists:
print(str(count) + ": " + playlist['name'].strip()) print(str(count) + ": " + playlist["name"].strip())
count += 1 count += 1
print("\n> SELECT A PLAYLIST BY ID") print("\n> SELECT A PLAYLIST BY ID")
@ -759,7 +745,7 @@ def download_from_user_playlist():
# Core functions here # Core functions here
def check_raw(): def check_raw(): # pylint: disable=missing-function-docstring
if ZS_CONFIG["RAW_AUDIO_AS_IS"]: if ZS_CONFIG["RAW_AUDIO_AS_IS"]:
ZS_CONFIG["MUSIC_FORMAT"] = "wav" ZS_CONFIG["MUSIC_FORMAT"] = "wav"