diff --git a/zspotify/app.py b/zspotify/app.py index bb17bc77..c30bceb8 100644 --- a/zspotify/app.py +++ b/zspotify/app.py @@ -48,7 +48,7 @@ def client(args) -> None: if args.liked_songs: for song in get_saved_tracks(): - if not song[TRACK][NAME]: + if not song[TRACK][NAME] or not song[TRACK][ID]: Printer.print(PrintChannel.SKIPS, '### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###' + "\n") else: download_track('liked', song[TRACK][ID]) @@ -85,10 +85,17 @@ def download_from_urls(urls: list[str]) -> bool: enum = 1 char_num = len(str(len(playlist_songs))) for song in playlist_songs: - if not song[TRACK][NAME]: + if not song[TRACK][NAME] or not song[TRACK][ID]: Printer.print(PrintChannel.SKIPS, '### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###' + "\n") else: - download_track('playlist', song[TRACK][ID], extra_keys={'playlist': name, 'playlist_num': str(enum).zfill(char_num)}) + download_track('playlist', song[TRACK][ID], extra_keys= + { + 'playlist_song_name': song[TRACK][NAME], + 'playlist': name, + 'playlist_num': str(enum).zfill(char_num), + 'playlist_id': playlist_id, + 'playlist_track_id': song[TRACK][ID] + }) enum += 1 elif episode_id is not None: download = True diff --git a/zspotify/config.py b/zspotify/config.py index be8a7349..9aee4225 100644 --- a/zspotify/config.py +++ b/zspotify/config.py @@ -29,6 +29,8 @@ PRINT_API_ERRORS = 'PRINT_API_ERRORS' TEMP_DOWNLOAD_DIR = 'TEMP_DOWNLOAD_DIR' MD_ALLGENRES = 'MD_ALLGENRES' MD_GENREDELIMITER = 'MD_GENREDELIMITER' +PRINT_PROGRESS_INFO = 'PRINT_PROGRESS_INFO' +PRINT_WARNINGS = 'PRINT_WARNINGS' CONFIG_VALUES = { ROOT_PATH: { 'default': '../ZSpotify Music/', 'type': str, 'arg': '--root-path' }, @@ -52,7 +54,9 @@ CONFIG_VALUES = { PRINT_DOWNLOAD_PROGRESS: { 'default': 'True', 'type': bool, 'arg': '--print-download-progress' }, PRINT_ERRORS: { 'default': 'True', 'type': bool, 'arg': '--print-errors' }, PRINT_DOWNLOADS: { 'default': 'False', 'type': bool, 'arg': '--print-downloads' }, - PRINT_API_ERRORS: { 'default': 'False', 'type': bool, 'arg': '--print-api-errors' }, + PRINT_API_ERRORS: { 'default': 'False', 'type': bool, 'arg': '--print-api-errors' }, + PRINT_PROGRESS_INFO: { 'default': 'True', 'type': bool, 'arg': '--print-progress-info' }, + PRINT_WARNINGS: { 'default': 'True', 'type': bool, 'arg': '--print-warnings' }, MD_ALLGENRES: { 'default': 'False', 'type': bool, 'arg': '--md-allgenres' }, MD_GENREDELIMITER: { 'default': ';', 'type': str, 'arg': '--md-genredelimiter' }, TEMP_DOWNLOAD_DIR: { 'default': '', 'type': str, 'arg': '--temp-download-dir' } @@ -154,7 +158,7 @@ class Config: return cls.get(SPLIT_ALBUM_DISCS) @classmethod - def get_chunk_size(cls) -> int(): + def get_chunk_size(cls) -> int: return cls.get(CHUNK_SIZE) @classmethod diff --git a/zspotify/loader.py b/zspotify/loader.py new file mode 100644 index 00000000..faa3cb6e --- /dev/null +++ b/zspotify/loader.py @@ -0,0 +1,72 @@ +# load symbol from: +# https://stackoverflow.com/questions/22029562/python-how-to-make-simple-animated-loading-while-process-is-running + +# imports +from itertools import cycle +from shutil import get_terminal_size +from threading import Thread +from time import sleep + +from termoutput import Printer + + +class Loader: + """Busy symbol. + + Can be called inside a context: + + with Loader("This take some Time..."): + # do something + pass + """ + def __init__(self, chan, desc="Loading...", end='', timeout=0.1, mode='std1'): + """ + A loader-like context manager + + Args: + desc (str, optional): The loader's description. Defaults to "Loading...". + end (str, optional): Final print. Defaults to "". + timeout (float, optional): Sleep time between prints. Defaults to 0.1. + """ + self.desc = desc + self.end = end + self.timeout = timeout + self.channel = chan + + self._thread = Thread(target=self._animate, daemon=True) + if mode == 'std1': + self.steps = ["⢿", "⣻", "⣽", "⣾", "⣷", "⣯", "⣟", "⡿"] + elif mode == 'std2': + self.steps = ["◜","◝","◞","◟"] + elif mode == 'std3': + self.steps = ["😐 ","😐 ","😮 ","😮 ","😦 ","😦 ","😧 ","😧 ","🤯 ","💥 ","✨ ","\u3000 ","\u3000 ","\u3000 "] + elif mode == 'prog': + self.steps = ["[∙∙∙]","[●∙∙]","[∙●∙]","[∙∙●]","[∙∙∙]"] + + self.done = False + + def start(self): + self._thread.start() + return self + + def _animate(self): + for c in cycle(self.steps): + if self.done: + break + Printer.print_loader(self.channel, f"\r\t{c} {self.desc} ") + sleep(self.timeout) + + def __enter__(self): + self.start() + + def stop(self): + self.done = True + cols = get_terminal_size((80, 20)).columns + Printer.print_loader(self.channel, "\r" + " " * cols) + + if self.end != "": + Printer.print_loader(self.channel, f"\r{self.end}") + + def __exit__(self, exc_type, exc_value, tb): + # handle exceptions with those variables ^ + self.stop() diff --git a/zspotify/termoutput.py b/zspotify/termoutput.py index 06eec980..ea148441 100644 --- a/zspotify/termoutput.py +++ b/zspotify/termoutput.py @@ -1,7 +1,8 @@ +import sys from enum import Enum from tqdm import tqdm -from config import PRINT_SPLASH, PRINT_SKIPS, PRINT_DOWNLOAD_PROGRESS, PRINT_ERRORS, PRINT_DOWNLOADS, PRINT_API_ERRORS +from config import * from zspotify import ZSpotify @@ -10,15 +11,28 @@ class PrintChannel(Enum): SKIPS = PRINT_SKIPS DOWNLOAD_PROGRESS = PRINT_DOWNLOAD_PROGRESS ERRORS = PRINT_ERRORS + WARNINGS = PRINT_WARNINGS DOWNLOADS = PRINT_DOWNLOADS API_ERRORS = PRINT_API_ERRORS + PROGRESS_INFO = PRINT_PROGRESS_INFO + + +ERROR_CHANNEL = [PrintChannel.ERRORS, PrintChannel.API_ERRORS] class Printer: @staticmethod def print(channel: PrintChannel, msg: str) -> None: if ZSpotify.CONFIG.get(channel.value): - print(msg) + if channel in ERROR_CHANNEL: + print(msg, file=sys.stderr) + else: + print(msg) + + @staticmethod + def print_loader(channel: PrintChannel, msg: str) -> None: + if ZSpotify.CONFIG.get(channel.value): + print(msg, flush=True, end="") @staticmethod def progress(iterable=None, desc=None, total=None, unit='it', disable=False, unit_scale=False, unit_divisor=1000): diff --git a/zspotify/track.py b/zspotify/track.py index 74704eaf..c6cf7aad 100644 --- a/zspotify/track.py +++ b/zspotify/track.py @@ -1,6 +1,5 @@ import os import re -from threading import Thread import time import uuid from typing import Any, Tuple, List @@ -9,7 +8,7 @@ from librespot.audio.decoders import AudioQuality from librespot.metadata import TrackId from ffmpy import FFmpeg -from const import TRACKS, ALBUM, GENRES, GENRE, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \ +from const import TRACKS, ALBUM, GENRES, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \ RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS from termoutput import Printer, PrintChannel from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \ @@ -17,7 +16,7 @@ from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_down from zspotify import ZSpotify import traceback -from utils import Loader +from loader import Loader def get_saved_tracks() -> list: """ Returns user's saved tracks """ @@ -38,7 +37,7 @@ def get_saved_tracks() -> list: def get_song_info(song_id) -> Tuple[List[str], List[str], str, str, Any, Any, Any, Any, Any, Any, int]: """ Retrieves metadata for downloaded songs """ - with Loader("Fetching track information..."): + with Loader(PrintChannel.PROGRESS_INFO, "Fetching track information..."): (raw, info) = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token') if not TRACKS in info: @@ -50,18 +49,18 @@ def get_song_info(song_id) -> Tuple[List[str], List[str], str, str, Any, Any, An for data in info[TRACKS][0][ARTISTS]: artists.append(data[NAME]) # query artist genres via href, which will be the api url - with Loader("Fetching artist information..."): + with Loader(PrintChannel.PROGRESS_INFO, "Fetching artist information..."): (raw, artistInfo) = ZSpotify.invoke_url(f'{data["href"]}') if ZSpotify.CONFIG.get_allGenres() and len(artistInfo[GENRES]) > 0: for genre in artistInfo[GENRES]: genres.append(genre) elif len(artistInfo[GENRES]) > 0: genres.append(artistInfo[GENRES][0]) - + if len(genres) == 0: - Printer.print(PrintChannel.SKIPS, '### No Genre found.') - genres.append('') - + Printer.print(PrintChannel.WARNINGS, '### No Genres found for song ' + info[TRACKS][0][NAME]) + genres.append('') + album_name = info[TRACKS][0][ALBUM][NAME] name = info[TRACKS][0][NAME] image_url = info[TRACKS][0][ALBUM][IMAGES][0][URL] @@ -93,19 +92,23 @@ def get_song_duration(song_id: str) -> float: return duration + # noinspection PyBroadException -def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=False) -> None: +def download_track(mode: str, track_id: str, extra_keys=None, disable_progressbar=False) -> None: """ Downloads raw song audio from Spotify """ + if extra_keys is None: + extra_keys = {} + + prepare_download_loader = Loader(PrintChannel.PROGRESS_INFO, "Preparing download...") + prepare_download_loader.start() + try: output_template = ZSpotify.CONFIG.get_output(mode) (artists, genres, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable, duration_ms) = get_song_info(track_id) - - prepareDownloadLoader = Loader("Preparing download..."); - prepareDownloadLoader.start() - + song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name) for k in extra_keys: @@ -143,24 +146,27 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar= filename = os.path.join(filedir, f'{fname}_{c}{ext}') - except Exception as e: Printer.print(PrintChannel.ERRORS, '### SKIPPING SONG - FAILED TO QUERY METADATA ###') - Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id) + "\n") + Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id)) + for k in extra_keys: + Printer.print(PrintChannel.ERRORS, k + ': ' + str(extra_keys[k])) + Printer.print(PrintChannel.ERRORS, "\n") Printer.print(PrintChannel.ERRORS, str(e) + "\n") Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n") + else: try: if not is_playable: - prepareDownloadLoader.stop(); + prepare_download_loader.stop() Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG IS UNAVAILABLE) ###' + "\n") else: if check_id and check_name and ZSpotify.CONFIG.get_skip_existing_files(): - prepareDownloadLoader.stop(); + prepare_download_loader.stop() Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY EXISTS) ###' + "\n") elif check_all_time and ZSpotify.CONFIG.get_skip_previously_downloaded(): - prepareDownloadLoader.stop(); + prepare_download_loader.stop() Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE) ###' + "\n") else: @@ -171,7 +177,7 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar= create_download_directory(filedir) total_size = stream.input_stream.size - prepareDownloadLoader.stop(); + prepare_download_loader.stop() time_start = time.time() downloaded = 0 @@ -217,13 +223,18 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar= time.sleep(ZSpotify.CONFIG.get_anti_ban_wait_time()) except Exception as e: Printer.print(PrintChannel.ERRORS, '### SKIPPING: ' + song_name + ' (GENERAL DOWNLOAD ERROR) ###') - Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id) + "\n") + Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id)) + for k in extra_keys: + Printer.print(PrintChannel.ERRORS, k + ': ' + str(extra_keys[k])) + Printer.print(PrintChannel.ERRORS, "\n") Printer.print(PrintChannel.ERRORS, str(e) + "\n") Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n") if os.path.exists(filename_temp): os.remove(filename_temp) - - prepareDownloadLoader.stop() + + prepare_download_loader.stop() + + def convert_audio_format(filename) -> None: """ Converts raw audio into playable file """ temp_filename = f'{os.path.splitext(filename)[0]}.tmp' @@ -250,9 +261,9 @@ def convert_audio_format(filename) -> None: inputs={temp_filename: None}, outputs={filename: output_params} ) - - with Loader("Converting file..."): + + with Loader(PrintChannel.PROGRESS_INFO, "Converting file..."): ff_m.run() - + if os.path.exists(temp_filename): os.remove(temp_filename) diff --git a/zspotify/utils.py b/zspotify/utils.py index ee20cec2..f56ce5d0 100644 --- a/zspotify/utils.py +++ b/zspotify/utils.py @@ -280,84 +280,3 @@ def fmt_seconds(secs: float) -> str: return f'{m}'.zfill(2) + ':' + f'{s}'.zfill(2) else: return f'{h}'.zfill(2) + ':' + f'{m}'.zfill(2) + ':' + f'{s}'.zfill(2) - - -# load symbol from: -# https://stackoverflow.com/questions/22029562/python-how-to-make-simple-animated-loading-while-process-is-running - -# imports -from itertools import cycle -from shutil import get_terminal_size -from threading import Thread -from time import sleep - -class Loader: - """Busy symbol. - - Can be called inside a context: - - with Loader("This take some Time..."): - # do something - pass - """ - def __init__(self, desc="Loading...", end='', timeout=0.1, mode='std1'): - """ - A loader-like context manager - - Args: - desc (str, optional): The loader's description. Defaults to "Loading...". - end (str, optional): Final print. Defaults to "". - timeout (float, optional): Sleep time between prints. Defaults to 0.1. - """ - self.desc = desc - self.end = end - self.timeout = timeout - - self._thread = Thread(target=self._animate, daemon=True) - if mode == 'std1': - self.steps = ["⢿", "⣻", "⣽", "⣾", "⣷", "⣯", "⣟", "⡿"] - elif mode == 'std2': - self.steps = ["◜","◝","◞","◟"] - elif mode == 'std3': - self.steps = ["😐 ","😐 ","😮 ","😮 ","😦 ","😦 ","😧 ","😧 ","🤯 ","💥 ","✨ ","\u3000 ","\u3000 ","\u3000 "] - elif mode == 'prog': - self.steps = ["[∙∙∙]","[●∙∙]","[∙●∙]","[∙∙●]","[∙∙∙]"] - - self.done = False - - def start(self): - self._thread.start() - return self - - def _animate(self): - for c in cycle(self.steps): - if self.done: - break - print(f"\r\t{c} {self.desc} ", flush=True, end="") - sleep(self.timeout) - - def __enter__(self): - self.start() - - def stop(self): - self.done = True - cols = get_terminal_size((80, 20)).columns - print("\r" + " " * cols, end="", flush=True) - - if self.end != "": - print(f"\r{self.end}", flush=True) - - def __exit__(self, exc_type, exc_value, tb): - # handle exceptions with those variables ^ - self.stop() - - -if __name__ == "__main__": - with Loader("Loading with context manager..."): - for i in range(10): - sleep(0.25) - - loader = Loader("Loading with object...", "That was fast!", 0.05).start() - for i in range(10): - sleep(0.25) - loader.stop() diff --git a/zspotify/zspotify.py b/zspotify/zspotify.py index e535a9f3..977c4b0a 100644 --- a/zspotify/zspotify.py +++ b/zspotify/zspotify.py @@ -86,16 +86,18 @@ class ZSpotify: from termoutput import Printer, PrintChannel headers = cls.get_auth_header() response = requests.get(url, headers=headers) - responseText = response.text - responseJson = response.json() + responsetext = response.text + responsejson = response.json() - if 'error' in responseJson and tryCount < 5: - - Printer.Print(PrintChannel.API_ERROR, f"Spotify API Error ({responseJson['error']['status']}): {responseJson['error']['message']}") - time.sleep(5) - return cls.invoke_url(url, tryCount + 1) - - return responseText, responseJson + if 'error' in responsejson: + if tryCount < 5: + Printer.print(PrintChannel.WARNINGS, f"Spotify API Error (try {tryCount}) ({responsejson['error']['status']}): {responsejson['error']['message']}") + time.sleep(5) + return cls.invoke_url(url, tryCount + 1) + + Printer.print(PrintChannel.API_ERRORS, f"Spotify API Error ({responsejson['error']['status']}): {responsejson['error']['message']}") + + return responsetext, responsejson @classmethod def check_premium(cls) -> bool: