mirror of
				https://github.com/THIS-IS-NOT-A-BACKUP/zspotify.git
				synced 2025-11-03 21:10:34 +00:00 
			
		
		
		
	Merge remote-tracking branch 'upstream/main'
This commit is contained in:
		
						commit
						e71700ad97
					
				@ -48,7 +48,7 @@ def client(args) -> None:
 | 
			
		||||
 | 
			
		||||
    if args.liked_songs:
 | 
			
		||||
        for song in get_saved_tracks():
 | 
			
		||||
            if not song[TRACK][NAME]:
 | 
			
		||||
            if not song[TRACK][NAME] or not song[TRACK][ID]:
 | 
			
		||||
                Printer.print(PrintChannel.SKIPS, '###   SKIPPING:  SONG DOES NOT EXIST ON SPOTIFY ANYMORE   ###' + "\n")
 | 
			
		||||
            else:
 | 
			
		||||
                download_track('liked', song[TRACK][ID])
 | 
			
		||||
@ -85,10 +85,17 @@ def download_from_urls(urls: list[str]) -> bool:
 | 
			
		||||
            enum = 1
 | 
			
		||||
            char_num = len(str(len(playlist_songs)))
 | 
			
		||||
            for song in playlist_songs:
 | 
			
		||||
                if not song[TRACK][NAME]:
 | 
			
		||||
                if not song[TRACK][NAME] or not song[TRACK][ID]:
 | 
			
		||||
                    Printer.print(PrintChannel.SKIPS, '###   SKIPPING:  SONG DOES NOT EXIST ON SPOTIFY ANYMORE   ###' + "\n")
 | 
			
		||||
                else:
 | 
			
		||||
                    download_track('playlist', song[TRACK][ID], extra_keys={'playlist': name, 'playlist_num': str(enum).zfill(char_num)})
 | 
			
		||||
                    download_track('playlist', song[TRACK][ID], extra_keys=
 | 
			
		||||
                    {
 | 
			
		||||
                        'playlist_song_name': song[TRACK][NAME],
 | 
			
		||||
                        'playlist': name,
 | 
			
		||||
                        'playlist_num': str(enum).zfill(char_num),
 | 
			
		||||
                        'playlist_id': playlist_id,
 | 
			
		||||
                        'playlist_track_id': song[TRACK][ID]
 | 
			
		||||
                    })
 | 
			
		||||
                    enum += 1
 | 
			
		||||
        elif episode_id is not None:
 | 
			
		||||
            download = True
 | 
			
		||||
 | 
			
		||||
@ -29,6 +29,8 @@ PRINT_API_ERRORS = 'PRINT_API_ERRORS'
 | 
			
		||||
TEMP_DOWNLOAD_DIR = 'TEMP_DOWNLOAD_DIR'
 | 
			
		||||
MD_ALLGENRES = 'MD_ALLGENRES'
 | 
			
		||||
MD_GENREDELIMITER = 'MD_GENREDELIMITER'
 | 
			
		||||
PRINT_PROGRESS_INFO = 'PRINT_PROGRESS_INFO'
 | 
			
		||||
PRINT_WARNINGS = 'PRINT_WARNINGS'
 | 
			
		||||
 | 
			
		||||
CONFIG_VALUES = {
 | 
			
		||||
    ROOT_PATH:                  { 'default': '../ZSpotify Music/',    'type': str,  'arg': '--root-path'                  },
 | 
			
		||||
@ -52,7 +54,9 @@ CONFIG_VALUES = {
 | 
			
		||||
    PRINT_DOWNLOAD_PROGRESS:    { 'default': 'True',                  'type': bool, 'arg': '--print-download-progress'    },
 | 
			
		||||
    PRINT_ERRORS:               { 'default': 'True',                  'type': bool, 'arg': '--print-errors'               },
 | 
			
		||||
    PRINT_DOWNLOADS:            { 'default': 'False',                 'type': bool, 'arg': '--print-downloads'            },
 | 
			
		||||
    PRINT_API_ERRORS:           { 'default': 'False',                 'type': bool, 'arg': '--print-api-errors'            },
 | 
			
		||||
    PRINT_API_ERRORS:           { 'default': 'False',                 'type': bool, 'arg': '--print-api-errors'           },
 | 
			
		||||
    PRINT_PROGRESS_INFO:        { 'default': 'True',                  'type': bool, 'arg': '--print-progress-info'        },
 | 
			
		||||
    PRINT_WARNINGS:             { 'default': 'True',                  'type': bool, 'arg': '--print-warnings'             },
 | 
			
		||||
    MD_ALLGENRES:               { 'default': 'False',                 'type': bool, 'arg': '--md-allgenres'               },
 | 
			
		||||
    MD_GENREDELIMITER:          { 'default': ';',                     'type': str,  'arg': '--md-genredelimiter'          },
 | 
			
		||||
    TEMP_DOWNLOAD_DIR:          { 'default': '',                      'type': str,  'arg': '--temp-download-dir'          }
 | 
			
		||||
@ -154,7 +158,7 @@ class Config:
 | 
			
		||||
        return cls.get(SPLIT_ALBUM_DISCS)
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def get_chunk_size(cls) -> int():
 | 
			
		||||
    def get_chunk_size(cls) -> int:
 | 
			
		||||
        return cls.get(CHUNK_SIZE)
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										72
									
								
								zspotify/loader.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										72
									
								
								zspotify/loader.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,72 @@
 | 
			
		||||
# load symbol from:
 | 
			
		||||
# https://stackoverflow.com/questions/22029562/python-how-to-make-simple-animated-loading-while-process-is-running
 | 
			
		||||
 | 
			
		||||
# imports
 | 
			
		||||
from itertools import cycle
 | 
			
		||||
from shutil import get_terminal_size
 | 
			
		||||
from threading import Thread
 | 
			
		||||
from time import sleep
 | 
			
		||||
 | 
			
		||||
from termoutput import Printer
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Loader:
 | 
			
		||||
    """Busy symbol.
 | 
			
		||||
 | 
			
		||||
    Can be called inside a context:
 | 
			
		||||
 | 
			
		||||
    with Loader("This take some Time..."):
 | 
			
		||||
        # do something
 | 
			
		||||
        pass
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, chan, desc="Loading...", end='', timeout=0.1, mode='std1'):
 | 
			
		||||
        """
 | 
			
		||||
        A loader-like context manager
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            desc (str, optional): The loader's description. Defaults to "Loading...".
 | 
			
		||||
            end (str, optional): Final print. Defaults to "".
 | 
			
		||||
            timeout (float, optional): Sleep time between prints. Defaults to 0.1.
 | 
			
		||||
        """
 | 
			
		||||
        self.desc = desc
 | 
			
		||||
        self.end = end
 | 
			
		||||
        self.timeout = timeout
 | 
			
		||||
        self.channel = chan
 | 
			
		||||
 | 
			
		||||
        self._thread = Thread(target=self._animate, daemon=True)
 | 
			
		||||
        if mode == 'std1':
 | 
			
		||||
            self.steps = ["⢿", "⣻", "⣽", "⣾", "⣷", "⣯", "⣟", "⡿"]
 | 
			
		||||
        elif mode == 'std2':
 | 
			
		||||
            self.steps = ["◜","◝","◞","◟"]
 | 
			
		||||
        elif mode == 'std3':
 | 
			
		||||
            self.steps = ["😐 ","😐 ","😮 ","😮 ","😦 ","😦 ","😧 ","😧 ","🤯 ","💥 ","✨ ","\u3000 ","\u3000 ","\u3000 "]
 | 
			
		||||
        elif mode == 'prog':
 | 
			
		||||
            self.steps = ["[∙∙∙]","[●∙∙]","[∙●∙]","[∙∙●]","[∙∙∙]"]
 | 
			
		||||
 | 
			
		||||
        self.done = False
 | 
			
		||||
 | 
			
		||||
    def start(self):
 | 
			
		||||
        self._thread.start()
 | 
			
		||||
        return self
 | 
			
		||||
 | 
			
		||||
    def _animate(self):
 | 
			
		||||
        for c in cycle(self.steps):
 | 
			
		||||
            if self.done:
 | 
			
		||||
                break
 | 
			
		||||
            Printer.print_loader(self.channel, f"\r\t{c} {self.desc} ")
 | 
			
		||||
            sleep(self.timeout)
 | 
			
		||||
 | 
			
		||||
    def __enter__(self):
 | 
			
		||||
        self.start()
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        self.done = True
 | 
			
		||||
        cols = get_terminal_size((80, 20)).columns
 | 
			
		||||
        Printer.print_loader(self.channel, "\r" + " " * cols)
 | 
			
		||||
 | 
			
		||||
        if self.end != "":
 | 
			
		||||
            Printer.print_loader(self.channel, f"\r{self.end}")
 | 
			
		||||
 | 
			
		||||
    def __exit__(self, exc_type, exc_value, tb):
 | 
			
		||||
        # handle exceptions with those variables ^
 | 
			
		||||
        self.stop()
 | 
			
		||||
@ -1,7 +1,8 @@
 | 
			
		||||
import sys
 | 
			
		||||
from enum import Enum
 | 
			
		||||
from tqdm import tqdm
 | 
			
		||||
 | 
			
		||||
from config import PRINT_SPLASH, PRINT_SKIPS, PRINT_DOWNLOAD_PROGRESS, PRINT_ERRORS, PRINT_DOWNLOADS, PRINT_API_ERRORS
 | 
			
		||||
from config import *
 | 
			
		||||
from zspotify import ZSpotify
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -10,15 +11,28 @@ class PrintChannel(Enum):
 | 
			
		||||
    SKIPS = PRINT_SKIPS
 | 
			
		||||
    DOWNLOAD_PROGRESS = PRINT_DOWNLOAD_PROGRESS
 | 
			
		||||
    ERRORS = PRINT_ERRORS
 | 
			
		||||
    WARNINGS = PRINT_WARNINGS
 | 
			
		||||
    DOWNLOADS = PRINT_DOWNLOADS
 | 
			
		||||
    API_ERRORS = PRINT_API_ERRORS
 | 
			
		||||
    PROGRESS_INFO = PRINT_PROGRESS_INFO
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
ERROR_CHANNEL = [PrintChannel.ERRORS, PrintChannel.API_ERRORS]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Printer:
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def print(channel: PrintChannel, msg: str) -> None:
 | 
			
		||||
        if ZSpotify.CONFIG.get(channel.value):
 | 
			
		||||
            print(msg)
 | 
			
		||||
            if channel in ERROR_CHANNEL:
 | 
			
		||||
                print(msg, file=sys.stderr)
 | 
			
		||||
            else:
 | 
			
		||||
                print(msg)
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def print_loader(channel: PrintChannel, msg: str) -> None:
 | 
			
		||||
        if ZSpotify.CONFIG.get(channel.value):
 | 
			
		||||
            print(msg, flush=True, end="")
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def progress(iterable=None, desc=None, total=None, unit='it', disable=False, unit_scale=False, unit_divisor=1000):
 | 
			
		||||
 | 
			
		||||
@ -1,6 +1,5 @@
 | 
			
		||||
import os
 | 
			
		||||
import re
 | 
			
		||||
from threading import Thread
 | 
			
		||||
import time
 | 
			
		||||
import uuid
 | 
			
		||||
from typing import Any, Tuple, List
 | 
			
		||||
@ -9,7 +8,7 @@ from librespot.audio.decoders import AudioQuality
 | 
			
		||||
from librespot.metadata import TrackId
 | 
			
		||||
from ffmpy import FFmpeg
 | 
			
		||||
 | 
			
		||||
from const import TRACKS, ALBUM, GENRES, GENRE, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
 | 
			
		||||
from const import TRACKS, ALBUM, GENRES, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
 | 
			
		||||
    RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS
 | 
			
		||||
from termoutput import Printer, PrintChannel
 | 
			
		||||
from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \
 | 
			
		||||
@ -17,7 +16,7 @@ from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_down
 | 
			
		||||
from zspotify import ZSpotify
 | 
			
		||||
import traceback
 | 
			
		||||
 | 
			
		||||
from utils import Loader
 | 
			
		||||
from loader import Loader
 | 
			
		||||
 | 
			
		||||
def get_saved_tracks() -> list:
 | 
			
		||||
    """ Returns user's saved tracks """
 | 
			
		||||
@ -38,7 +37,7 @@ def get_saved_tracks() -> list:
 | 
			
		||||
 | 
			
		||||
def get_song_info(song_id) -> Tuple[List[str], List[str], str, str, Any, Any, Any, Any, Any, Any, int]:
 | 
			
		||||
    """ Retrieves metadata for downloaded songs """
 | 
			
		||||
    with Loader("Fetching track information..."):
 | 
			
		||||
    with Loader(PrintChannel.PROGRESS_INFO, "Fetching track information..."):
 | 
			
		||||
        (raw, info) = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token')
 | 
			
		||||
 | 
			
		||||
    if not TRACKS in info:
 | 
			
		||||
@ -50,18 +49,18 @@ def get_song_info(song_id) -> Tuple[List[str], List[str], str, str, Any, Any, An
 | 
			
		||||
        for data in info[TRACKS][0][ARTISTS]:
 | 
			
		||||
            artists.append(data[NAME])
 | 
			
		||||
            # query artist genres via href, which will be the api url
 | 
			
		||||
            with Loader("Fetching artist information..."):            
 | 
			
		||||
            with Loader(PrintChannel.PROGRESS_INFO, "Fetching artist information..."):
 | 
			
		||||
                (raw, artistInfo) = ZSpotify.invoke_url(f'{data["href"]}')
 | 
			
		||||
            if ZSpotify.CONFIG.get_allGenres() and len(artistInfo[GENRES]) > 0:
 | 
			
		||||
                for genre in artistInfo[GENRES]:
 | 
			
		||||
                    genres.append(genre)
 | 
			
		||||
            elif len(artistInfo[GENRES]) > 0:
 | 
			
		||||
                genres.append(artistInfo[GENRES][0])
 | 
			
		||||
         
 | 
			
		||||
 | 
			
		||||
        if len(genres) == 0:
 | 
			
		||||
            Printer.print(PrintChannel.SKIPS, '###    No Genre found.')
 | 
			
		||||
            genres.append('')   
 | 
			
		||||
                
 | 
			
		||||
            Printer.print(PrintChannel.WARNINGS, '###    No Genres found for song ' + info[TRACKS][0][NAME])
 | 
			
		||||
            genres.append('')
 | 
			
		||||
 | 
			
		||||
        album_name = info[TRACKS][0][ALBUM][NAME]
 | 
			
		||||
        name = info[TRACKS][0][NAME]
 | 
			
		||||
        image_url = info[TRACKS][0][ALBUM][IMAGES][0][URL]
 | 
			
		||||
@ -93,19 +92,23 @@ def get_song_duration(song_id: str) -> float:
 | 
			
		||||
 | 
			
		||||
    return duration
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# noinspection PyBroadException
 | 
			
		||||
def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=False) -> None:
 | 
			
		||||
def download_track(mode: str, track_id: str, extra_keys=None, disable_progressbar=False) -> None:
 | 
			
		||||
    """ Downloads raw song audio from Spotify """
 | 
			
		||||
 | 
			
		||||
    if extra_keys is None:
 | 
			
		||||
        extra_keys = {}
 | 
			
		||||
 | 
			
		||||
    prepare_download_loader = Loader(PrintChannel.PROGRESS_INFO, "Preparing download...")
 | 
			
		||||
    prepare_download_loader.start()
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        output_template = ZSpotify.CONFIG.get_output(mode)
 | 
			
		||||
 | 
			
		||||
        (artists, genres, album_name, name, image_url, release_year, disc_number,
 | 
			
		||||
         track_number, scraped_song_id, is_playable, duration_ms) = get_song_info(track_id)
 | 
			
		||||
        
 | 
			
		||||
        prepareDownloadLoader = Loader("Preparing download...");
 | 
			
		||||
        prepareDownloadLoader.start()
 | 
			
		||||
        
 | 
			
		||||
 | 
			
		||||
        song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name)
 | 
			
		||||
 | 
			
		||||
        for k in extra_keys:
 | 
			
		||||
@ -143,24 +146,27 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
 | 
			
		||||
 | 
			
		||||
            filename = os.path.join(filedir, f'{fname}_{c}{ext}')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        Printer.print(PrintChannel.ERRORS, '###   SKIPPING SONG - FAILED TO QUERY METADATA   ###')
 | 
			
		||||
        Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id) + "\n")
 | 
			
		||||
        Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id))
 | 
			
		||||
        for k in extra_keys:
 | 
			
		||||
            Printer.print(PrintChannel.ERRORS, k + ': ' + str(extra_keys[k]))
 | 
			
		||||
        Printer.print(PrintChannel.ERRORS, "\n")
 | 
			
		||||
        Printer.print(PrintChannel.ERRORS, str(e) + "\n")
 | 
			
		||||
        Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
 | 
			
		||||
 | 
			
		||||
    else:
 | 
			
		||||
        try:
 | 
			
		||||
            if not is_playable:
 | 
			
		||||
                prepareDownloadLoader.stop();
 | 
			
		||||
                prepare_download_loader.stop()
 | 
			
		||||
                Printer.print(PrintChannel.SKIPS, '\n###   SKIPPING: ' + song_name + ' (SONG IS UNAVAILABLE)   ###' + "\n")
 | 
			
		||||
            else:
 | 
			
		||||
                if check_id and check_name and ZSpotify.CONFIG.get_skip_existing_files():
 | 
			
		||||
                    prepareDownloadLoader.stop();
 | 
			
		||||
                    prepare_download_loader.stop()
 | 
			
		||||
                    Printer.print(PrintChannel.SKIPS, '\n###   SKIPPING: ' + song_name + ' (SONG ALREADY EXISTS)   ###' + "\n")
 | 
			
		||||
 | 
			
		||||
                elif check_all_time and ZSpotify.CONFIG.get_skip_previously_downloaded():
 | 
			
		||||
                    prepareDownloadLoader.stop();
 | 
			
		||||
                    prepare_download_loader.stop()
 | 
			
		||||
                    Printer.print(PrintChannel.SKIPS, '\n###   SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE)   ###' + "\n")
 | 
			
		||||
 | 
			
		||||
                else:
 | 
			
		||||
@ -171,7 +177,7 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
 | 
			
		||||
                    create_download_directory(filedir)
 | 
			
		||||
                    total_size = stream.input_stream.size
 | 
			
		||||
 | 
			
		||||
                    prepareDownloadLoader.stop();
 | 
			
		||||
                    prepare_download_loader.stop()
 | 
			
		||||
 | 
			
		||||
                    time_start = time.time()
 | 
			
		||||
                    downloaded = 0
 | 
			
		||||
@ -217,13 +223,18 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
 | 
			
		||||
                        time.sleep(ZSpotify.CONFIG.get_anti_ban_wait_time())
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            Printer.print(PrintChannel.ERRORS, '###   SKIPPING: ' + song_name + ' (GENERAL DOWNLOAD ERROR)   ###')
 | 
			
		||||
            Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id) + "\n")
 | 
			
		||||
            Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id))
 | 
			
		||||
            for k in extra_keys:
 | 
			
		||||
                Printer.print(PrintChannel.ERRORS, k + ': ' + str(extra_keys[k]))
 | 
			
		||||
            Printer.print(PrintChannel.ERRORS, "\n")
 | 
			
		||||
            Printer.print(PrintChannel.ERRORS, str(e) + "\n")
 | 
			
		||||
            Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
 | 
			
		||||
            if os.path.exists(filename_temp):
 | 
			
		||||
                os.remove(filename_temp)
 | 
			
		||||
                
 | 
			
		||||
    prepareDownloadLoader.stop()
 | 
			
		||||
 | 
			
		||||
    prepare_download_loader.stop()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def convert_audio_format(filename) -> None:
 | 
			
		||||
    """ Converts raw audio into playable file """
 | 
			
		||||
    temp_filename = f'{os.path.splitext(filename)[0]}.tmp'
 | 
			
		||||
@ -250,9 +261,9 @@ def convert_audio_format(filename) -> None:
 | 
			
		||||
        inputs={temp_filename: None},
 | 
			
		||||
        outputs={filename: output_params}
 | 
			
		||||
    )
 | 
			
		||||
    
 | 
			
		||||
    with Loader("Converting file..."):
 | 
			
		||||
 | 
			
		||||
    with Loader(PrintChannel.PROGRESS_INFO, "Converting file..."):
 | 
			
		||||
        ff_m.run()
 | 
			
		||||
        
 | 
			
		||||
 | 
			
		||||
    if os.path.exists(temp_filename):
 | 
			
		||||
        os.remove(temp_filename)
 | 
			
		||||
 | 
			
		||||
@ -280,84 +280,3 @@ def fmt_seconds(secs: float) -> str:
 | 
			
		||||
        return f'{m}'.zfill(2) + ':' + f'{s}'.zfill(2)
 | 
			
		||||
    else:
 | 
			
		||||
        return f'{h}'.zfill(2) + ':' + f'{m}'.zfill(2) + ':' + f'{s}'.zfill(2)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# load symbol from:
 | 
			
		||||
# https://stackoverflow.com/questions/22029562/python-how-to-make-simple-animated-loading-while-process-is-running
 | 
			
		||||
 | 
			
		||||
# imports
 | 
			
		||||
from itertools import cycle
 | 
			
		||||
from shutil import get_terminal_size
 | 
			
		||||
from threading import Thread
 | 
			
		||||
from time import sleep
 | 
			
		||||
 | 
			
		||||
class Loader:
 | 
			
		||||
    """Busy symbol.
 | 
			
		||||
 | 
			
		||||
    Can be called inside a context:
 | 
			
		||||
 | 
			
		||||
    with Loader("This take some Time..."):
 | 
			
		||||
        # do something
 | 
			
		||||
        pass          
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, desc="Loading...", end='', timeout=0.1, mode='std1'):
 | 
			
		||||
        """
 | 
			
		||||
        A loader-like context manager
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            desc (str, optional): The loader's description. Defaults to "Loading...".
 | 
			
		||||
            end (str, optional): Final print. Defaults to "".
 | 
			
		||||
            timeout (float, optional): Sleep time between prints. Defaults to 0.1.
 | 
			
		||||
        """
 | 
			
		||||
        self.desc = desc
 | 
			
		||||
        self.end = end
 | 
			
		||||
        self.timeout = timeout
 | 
			
		||||
 | 
			
		||||
        self._thread = Thread(target=self._animate, daemon=True)
 | 
			
		||||
        if mode == 'std1':
 | 
			
		||||
            self.steps = ["⢿", "⣻", "⣽", "⣾", "⣷", "⣯", "⣟", "⡿"]
 | 
			
		||||
        elif mode == 'std2':
 | 
			
		||||
            self.steps = ["◜","◝","◞","◟"]
 | 
			
		||||
        elif mode == 'std3':
 | 
			
		||||
            self.steps = ["😐 ","😐 ","😮 ","😮 ","😦 ","😦 ","😧 ","😧 ","🤯 ","💥 ","✨ ","\u3000 ","\u3000 ","\u3000 "]
 | 
			
		||||
        elif mode == 'prog':
 | 
			
		||||
            self.steps = ["[∙∙∙]","[●∙∙]","[∙●∙]","[∙∙●]","[∙∙∙]"]
 | 
			
		||||
 | 
			
		||||
        self.done = False
 | 
			
		||||
 | 
			
		||||
    def start(self):
 | 
			
		||||
        self._thread.start()
 | 
			
		||||
        return self
 | 
			
		||||
 | 
			
		||||
    def _animate(self):
 | 
			
		||||
        for c in cycle(self.steps):
 | 
			
		||||
            if self.done:
 | 
			
		||||
                break
 | 
			
		||||
            print(f"\r\t{c} {self.desc} ", flush=True, end="")
 | 
			
		||||
            sleep(self.timeout)
 | 
			
		||||
 | 
			
		||||
    def __enter__(self):
 | 
			
		||||
        self.start()
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        self.done = True
 | 
			
		||||
        cols = get_terminal_size((80, 20)).columns
 | 
			
		||||
        print("\r" + " " * cols, end="", flush=True)
 | 
			
		||||
 | 
			
		||||
        if self.end != "":
 | 
			
		||||
            print(f"\r{self.end}", flush=True)
 | 
			
		||||
 | 
			
		||||
    def __exit__(self, exc_type, exc_value, tb):
 | 
			
		||||
        # handle exceptions with those variables ^
 | 
			
		||||
        self.stop()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    with Loader("Loading with context manager..."):
 | 
			
		||||
        for i in range(10):
 | 
			
		||||
            sleep(0.25)
 | 
			
		||||
 | 
			
		||||
    loader = Loader("Loading with object...", "That was fast!", 0.05).start()
 | 
			
		||||
    for i in range(10):
 | 
			
		||||
        sleep(0.25)
 | 
			
		||||
    loader.stop()
 | 
			
		||||
 | 
			
		||||
@ -86,16 +86,18 @@ class ZSpotify:
 | 
			
		||||
        from termoutput import Printer, PrintChannel                
 | 
			
		||||
        headers = cls.get_auth_header()
 | 
			
		||||
        response = requests.get(url, headers=headers)
 | 
			
		||||
        responseText = response.text
 | 
			
		||||
        responseJson = response.json()
 | 
			
		||||
        responsetext = response.text
 | 
			
		||||
        responsejson = response.json()
 | 
			
		||||
        
 | 
			
		||||
        if 'error' in responseJson and tryCount < 5:
 | 
			
		||||
            
 | 
			
		||||
            Printer.Print(PrintChannel.API_ERROR, f"Spotify API Error ({responseJson['error']['status']}): {responseJson['error']['message']}")            
 | 
			
		||||
            time.sleep(5)
 | 
			
		||||
            return cls.invoke_url(url, tryCount + 1)
 | 
			
		||||
                
 | 
			
		||||
        return responseText, responseJson
 | 
			
		||||
        if 'error' in responsejson:
 | 
			
		||||
            if tryCount < 5:
 | 
			
		||||
                Printer.print(PrintChannel.WARNINGS, f"Spotify API Error (try {tryCount}) ({responsejson['error']['status']}): {responsejson['error']['message']}")
 | 
			
		||||
                time.sleep(5)
 | 
			
		||||
                return cls.invoke_url(url, tryCount + 1)
 | 
			
		||||
 | 
			
		||||
            Printer.print(PrintChannel.API_ERRORS, f"Spotify API Error ({responsejson['error']['status']}): {responsejson['error']['message']}")
 | 
			
		||||
 | 
			
		||||
        return responsetext, responsejson
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def check_premium(cls) -> bool:
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user