Merge remote-tracking branch 'upstream/main'

This commit is contained in:
dabreadman 2021-11-26 18:07:18 +00:00
commit 2a45634ea3
8 changed files with 67 additions and 44 deletions

View File

@ -77,6 +77,7 @@ Be aware you have to set boolean values in the commandline like this: `--downloa
| PRINT_DOWNLOAD_PROGRESS | --print-download-progress | Print the download/playlist progress bars
| PRINT_ERRORS | --print-errors | Print errors
| PRINT_DOWNLOADS | --print-downloads | Print messages when a song is finished downloading
| TEMP_DOWNLOAD_DIR | --temp-download-dir | Download tracks to a temporary directory first
### Output format:

View File

@ -26,18 +26,18 @@ def get_album_tracks(album_id):
def get_album_name(album_id):
""" Returns album name """
resp = ZSpotify.invoke_url(f'{ALBUM_URL}/{album_id}')
(raw, resp) = ZSpotify.invoke_url(f'{ALBUM_URL}/{album_id}')
return resp[ARTISTS][0][NAME], fix_filename(resp[NAME])
def get_artist_albums(artist_id):
""" Returns artist's albums """
resp = ZSpotify.invoke_url(f'{ARTIST_URL}/{artist_id}/albums?include_groups=album%2Csingle')
(raw, resp) = ZSpotify.invoke_url(f'{ARTIST_URL}/{artist_id}/albums?include_groups=album%2Csingle')
# Return a list each album's id
album_ids = [resp[ITEMS][i][ID] for i in range(len(resp[ITEMS]))]
# Recursive requests to get all albums including singles an EPs
while resp['next'] is not None:
resp = ZSpotify.invoke_url(resp['next'])
(raw, resp) = ZSpotify.invoke_url(resp['next'])
album_ids.extend([resp[ITEMS][i][ID] for i in range(len(resp[ITEMS]))])
return album_ids

View File

@ -27,6 +27,7 @@ PRINT_SKIPS = 'PRINT_SKIPS'
PRINT_DOWNLOAD_PROGRESS = 'PRINT_DOWNLOAD_PROGRESS'
PRINT_ERRORS = 'PRINT_ERRORS'
PRINT_DOWNLOADS = 'PRINT_DOWNLOADS'
TEMP_DOWNLOAD_DIR = 'TEMP_DOWNLOAD_DIR'
CONFIG_VALUES = {
ROOT_PATH: { 'default': '../ZSpotify Music/', 'type': str, 'arg': '--root-path' },
@ -50,6 +51,7 @@ CONFIG_VALUES = {
PRINT_DOWNLOAD_PROGRESS: { 'default': 'True', 'type': bool, 'arg': '--print-download-progress' },
PRINT_ERRORS: { 'default': 'True', 'type': bool, 'arg': '--print-errors' },
PRINT_DOWNLOADS: { 'default': 'False', 'type': bool, 'arg': '--print-downloads' },
TEMP_DOWNLOAD_DIR: { 'default': '', 'type': str, 'arg': '--temp-download-dir' },
}
OUTPUT_DEFAULT_PLAYLIST = '{playlist}/{artist} - {song_name}.{ext}'
@ -129,11 +131,11 @@ class Config:
@classmethod
def get_root_path(cls) -> str:
return cls.get(ROOT_PATH)
return os.path.join(os.path.dirname(__file__), cls.get(ROOT_PATH))
@classmethod
def get_root_podcast_path(cls) -> str:
return cls.get(ROOT_PODCAST_PATH)
return os.path.join(os.path.dirname(__file__), cls.get(ROOT_PODCAST_PATH))
@classmethod
def get_skip_existing_files(cls) -> bool:
@ -181,11 +183,17 @@ class Config:
@classmethod
def get_song_archive(cls) -> str:
return cls.get(SONG_ARCHIVE)
return os.path.join(ZSpotify.CONFIG.get_root_path(), cls.get(SONG_ARCHIVE))
@classmethod
def get_credentials_location(cls) -> str:
return cls.get(CREDENTIALS_LOCATION)
return os.path.join(os.getcwd(), cls.get(CREDENTIALS_LOCATION))
@classmethod
def get_temp_download_dir(cls) -> str:
if cls.get(TEMP_DOWNLOAD_DIR) == '':
return ''
return os.path.join(ZSpotify.CONFIG.get_root_path(), cls.get(TEMP_DOWNLOAD_DIR))
@classmethod
def get_output(cls, mode: str) -> str:

View File

@ -42,7 +42,7 @@ def get_playlist_songs(playlist_id):
def get_playlist_info(playlist_id):
""" Returns information scraped from playlist """
resp = ZSpotify.invoke_url(f'{PLAYLISTS_URL}/{playlist_id}?fields=name,owner(display_name)&market=from_token')
(raw, resp) = ZSpotify.invoke_url(f'{PLAYLISTS_URL}/{playlist_id}?fields=name,owner(display_name)&market=from_token')
return resp['name'].strip(), resp['owner']['display_name'].strip()

View File

@ -14,7 +14,7 @@ SHOWS_URL = 'https://api.spotify.com/v1/shows'
def get_episode_info(episode_id_str) -> Tuple[Optional[str], Optional[str]]:
info = ZSpotify.invoke_url(f'{EPISODE_INFO_URL}/{episode_id_str}')
(raw, info) = ZSpotify.invoke_url(f'{EPISODE_INFO_URL}/{episode_id_str}')
if ERROR in info:
return None, None
return fix_filename(info[SHOW][NAME]), fix_filename(info[NAME])
@ -77,11 +77,7 @@ def download_episode(episode_id) -> None:
direct_download_url = ZSpotify.invoke_url(
'https://api-partner.spotify.com/pathfinder/v1/query?operationName=getEpisode&variables={"uri":"spotify:episode:' + episode_id + '"}&extensions={"persistedQuery":{"version":1,"sha256Hash":"224ba0fd89fcfdfb3a15fa2d82a6112d3f4e2ac88fba5c6713de04d1b72cf482"}}')["data"]["episode"]["audio"]["items"][-1]["url"]
download_directory = os.path.join(
os.path.dirname(__file__),
ZSpotify.CONFIG.get_root_podcast_path(),
extra_paths,
)
download_directory = os.path.join(ZSpotify.CONFIG.get_root_podcast_path(), extra_paths)
download_directory = os.path.realpath(download_directory)
create_download_directory(download_directory)

View File

@ -2,6 +2,7 @@ import math
import os
import re
import time
import uuid
from typing import Any, Tuple, List
from librespot.audio.decoders import AudioQuality
@ -35,27 +36,34 @@ def get_saved_tracks() -> list:
def get_song_info(song_id) -> Tuple[List[str], str, str, Any, Any, Any, Any, Any, Any, int]:
""" Retrieves metadata for downloaded songs """
info = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token')
(raw, info) = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token')
artists = []
for data in info[TRACKS][0][ARTISTS]:
artists.append(data[NAME])
album_name = info[TRACKS][0][ALBUM][NAME]
name = info[TRACKS][0][NAME]
image_url = info[TRACKS][0][ALBUM][IMAGES][0][URL]
release_year = info[TRACKS][0][ALBUM][RELEASE_DATE].split('-')[0]
disc_number = info[TRACKS][0][DISC_NUMBER]
track_number = info[TRACKS][0][TRACK_NUMBER]
scraped_song_id = info[TRACKS][0][ID]
is_playable = info[TRACKS][0][IS_PLAYABLE]
duration_ms = info[TRACKS][0][DURATION_MS]
if not TRACKS in info:
raise ValueError(f'Invalid response from TRACKS_URL:\n{raw}')
try:
artists = []
for data in info[TRACKS][0][ARTISTS]:
artists.append(data[NAME])
album_name = info[TRACKS][0][ALBUM][NAME]
name = info[TRACKS][0][NAME]
image_url = info[TRACKS][0][ALBUM][IMAGES][0][URL]
release_year = info[TRACKS][0][ALBUM][RELEASE_DATE].split('-')[0]
disc_number = info[TRACKS][0][DISC_NUMBER]
track_number = info[TRACKS][0][TRACK_NUMBER]
scraped_song_id = info[TRACKS][0][ID]
is_playable = info[TRACKS][0][IS_PLAYABLE]
duration_ms = info[TRACKS][0][DURATION_MS]
return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable, duration_ms
except Exception as e:
raise ValueError(f'Failed to parse TRACKS_URL response: {str(e)}\n{raw}')
return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable, duration_ms
def get_song_duration(song_id: str) -> float:
""" Retrieves duration of song in second as is on spotify """
resp = ZSpotify.invoke_url(f'{TRACK_STATS_URL}{song_id}')
(raw, resp) = ZSpotify.invoke_url(f'{TRACK_STATS_URL}{song_id}')
# get duration in miliseconds
ms_duration = resp['duration_ms']
@ -83,6 +91,8 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
for k in extra_keys:
output_template = output_template.replace("{"+k+"}", fix_filename(extra_keys[k]))
ext = EXT_MAP.get(ZSpotify.CONFIG.get_download_format().lower())
output_template = output_template.replace("{artist}", fix_filename(artists[0]))
output_template = output_template.replace("{album}", fix_filename(album_name))
output_template = output_template.replace("{song_name}", fix_filename(name))
@ -91,11 +101,15 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
output_template = output_template.replace("{track_number}", fix_filename(track_number))
output_template = output_template.replace("{id}", fix_filename(scraped_song_id))
output_template = output_template.replace("{track_id}", fix_filename(track_id))
output_template = output_template.replace("{ext}", EXT_MAP.get(ZSpotify.CONFIG.get_download_format().lower()))
output_template = output_template.replace("{ext}", ext)
filename = os.path.join(os.path.dirname(__file__), ZSpotify.CONFIG.get_root_path(), output_template)
filename = os.path.join(ZSpotify.CONFIG.get_root_path(), output_template)
filedir = os.path.dirname(filename)
filename_temp = filename
if ZSpotify.CONFIG.get_temp_download_dir() != '':
filename_temp = os.path.join(ZSpotify.CONFIG.get_temp_download_dir(), f'zspotify_{str(uuid.uuid4())}_{track_id}.{ext}')
check_name = os.path.isfile(filename) and os.path.getsize(filename)
check_id = scraped_song_id in get_directory_song_ids(filedir)
check_all_time = scraped_song_id in get_previously_downloaded()
@ -113,6 +127,7 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
except Exception as e:
Printer.print(PrintChannel.ERRORS, '### SKIPPING SONG - FAILED TO QUERY METADATA ###')
Printer.print(PrintChannel.ERRORS, str(e) + "\n")
Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
else:
try:
if not is_playable:
@ -128,14 +143,13 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
if track_id != scraped_song_id:
track_id = scraped_song_id
track_id = TrackId.from_base62(track_id)
stream = ZSpotify.get_content_stream(
track_id, ZSpotify.DOWNLOAD_QUALITY)
stream = ZSpotify.get_content_stream(track_id, ZSpotify.DOWNLOAD_QUALITY)
create_download_directory(filedir)
total_size = stream.input_stream.size
time_start = time.time()
downloaded = 0
with open(filename, 'wb') as file, Printer.progress(
with open(filename_temp, 'wb') as file, Printer.progress(
desc=song_name,
total=total_size,
unit='B',
@ -155,13 +169,16 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
time_downloaded = time.time()
convert_audio_format(filename)
set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number)
set_music_thumbnail(filename, image_url)
convert_audio_format(filename_temp)
set_audio_tags(filename_temp, artists, name, album_name, release_year, disc_number, track_number)
set_music_thumbnail(filename_temp, image_url)
if filename_temp != filename:
os.rename(filename_temp, filename)
time_finished = time.time()
Printer.print(PrintChannel.DOWNLOADS, f'### Downloaded "{song_name}" to "{os.path.relpath(filename, os.path.dirname(__file__))}" in {fmt_seconds(time_downloaded - time_start)} (plus {fmt_seconds(time_finished - time_downloaded)} converting) ###' + "\n")
Printer.print(PrintChannel.DOWNLOADS, f'### Downloaded "{song_name}" to "{os.path.relpath(filename, ZSpotify.CONFIG.get_root_path())}" in {fmt_seconds(time_downloaded - time_start)} (plus {fmt_seconds(time_finished - time_downloaded)} converting) ###' + "\n")
# add song id to archive file
if ZSpotify.CONFIG.get_skip_previously_downloaded():
@ -176,8 +193,8 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
Printer.print(PrintChannel.ERRORS, '### SKIPPING: ' + song_name + ' (GENERAL DOWNLOAD ERROR) ###')
Printer.print(PrintChannel.ERRORS, str(e) + "\n")
Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
if os.path.exists(filename):
os.remove(filename)
if os.path.exists(filename_temp):
os.remove(filename_temp)
def convert_audio_format(filename) -> None:

View File

@ -36,7 +36,7 @@ def get_previously_downloaded() -> List[str]:
""" Returns list of all time downloaded songs """
ids = []
archive_path = os.path.join(os.path.dirname(__file__), ZSpotify.CONFIG.get_root_path(), ZSpotify.CONFIG.get_song_archive())
archive_path = ZSpotify.CONFIG.get_song_archive()
if os.path.exists(archive_path):
with open(archive_path, 'r', encoding='utf-8') as f:
@ -48,7 +48,7 @@ def get_previously_downloaded() -> List[str]:
def add_to_archive(song_id: str, filename: str, author_name: str, song_name: str) -> None:
""" Adds song id to all time installed songs archive """
archive_path = os.path.join(os.path.dirname(__file__), ZSpotify.CONFIG.get_root_path(), ZSpotify.CONFIG.get_song_archive())
archive_path = ZSpotify.CONFIG.get_song_archive()
if os.path.exists(archive_path):
with open(archive_path, 'a', encoding='utf-8') as file:

View File

@ -35,7 +35,7 @@ class ZSpotify:
def login(cls):
""" Authenticates with Spotify and saves credentials to a file """
cred_location = os.path.join(os.getcwd(), Config.get_credentials_location())
cred_location = Config.get_credentials_location()
if os.path.isfile(cred_location):
try:
@ -86,7 +86,8 @@ class ZSpotify:
@classmethod
def invoke_url(cls, url):
headers = cls.get_auth_header()
return requests.get(url, headers=headers).json()
response = requests.get(url, headers=headers)
return response.text, response.json()
@classmethod
def check_premium(cls) -> bool: