mirror of
https://github.com/THIS-IS-NOT-A-BACKUP/zspotify.git
synced 2024-11-29 19:24:34 +01:00
commit
2f9766859a
@ -48,7 +48,7 @@ def client(args) -> None:
|
|||||||
|
|
||||||
if args.liked_songs:
|
if args.liked_songs:
|
||||||
for song in get_saved_tracks():
|
for song in get_saved_tracks():
|
||||||
if not song[TRACK][NAME]:
|
if not song[TRACK][NAME] or not song[TRACK][ID]:
|
||||||
Printer.print(PrintChannel.SKIPS, '### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###' + "\n")
|
Printer.print(PrintChannel.SKIPS, '### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###' + "\n")
|
||||||
else:
|
else:
|
||||||
download_track('liked', song[TRACK][ID])
|
download_track('liked', song[TRACK][ID])
|
||||||
@ -85,10 +85,17 @@ def download_from_urls(urls: list[str]) -> bool:
|
|||||||
enum = 1
|
enum = 1
|
||||||
char_num = len(str(len(playlist_songs)))
|
char_num = len(str(len(playlist_songs)))
|
||||||
for song in playlist_songs:
|
for song in playlist_songs:
|
||||||
if not song[TRACK][NAME]:
|
if not song[TRACK][NAME] or not song[TRACK][ID]:
|
||||||
Printer.print(PrintChannel.SKIPS, '### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###' + "\n")
|
Printer.print(PrintChannel.SKIPS, '### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###' + "\n")
|
||||||
else:
|
else:
|
||||||
download_track('playlist', song[TRACK][ID], extra_keys={'playlist': name, 'playlist_num': str(enum).zfill(char_num)})
|
download_track('playlist', song[TRACK][ID], extra_keys=
|
||||||
|
{
|
||||||
|
'playlist_song_name': song[TRACK][NAME],
|
||||||
|
'playlist': name,
|
||||||
|
'playlist_num': str(enum).zfill(char_num),
|
||||||
|
'playlist_id': playlist_id,
|
||||||
|
'playlist_track_id': song[TRACK][ID]
|
||||||
|
})
|
||||||
enum += 1
|
enum += 1
|
||||||
elif episode_id is not None:
|
elif episode_id is not None:
|
||||||
download = True
|
download = True
|
||||||
|
@ -29,6 +29,8 @@ PRINT_API_ERRORS = 'PRINT_API_ERRORS'
|
|||||||
TEMP_DOWNLOAD_DIR = 'TEMP_DOWNLOAD_DIR'
|
TEMP_DOWNLOAD_DIR = 'TEMP_DOWNLOAD_DIR'
|
||||||
MD_ALLGENRES = 'MD_ALLGENRES'
|
MD_ALLGENRES = 'MD_ALLGENRES'
|
||||||
MD_GENREDELIMITER = 'MD_GENREDELIMITER'
|
MD_GENREDELIMITER = 'MD_GENREDELIMITER'
|
||||||
|
PRINT_PROGRESS_INFO = 'PRINT_PROGRESS_INFO'
|
||||||
|
PRINT_WARNINGS = 'PRINT_WARNINGS'
|
||||||
|
|
||||||
CONFIG_VALUES = {
|
CONFIG_VALUES = {
|
||||||
ROOT_PATH: { 'default': '../ZSpotify Music/', 'type': str, 'arg': '--root-path' },
|
ROOT_PATH: { 'default': '../ZSpotify Music/', 'type': str, 'arg': '--root-path' },
|
||||||
@ -53,6 +55,8 @@ CONFIG_VALUES = {
|
|||||||
PRINT_ERRORS: { 'default': 'True', 'type': bool, 'arg': '--print-errors' },
|
PRINT_ERRORS: { 'default': 'True', 'type': bool, 'arg': '--print-errors' },
|
||||||
PRINT_DOWNLOADS: { 'default': 'False', 'type': bool, 'arg': '--print-downloads' },
|
PRINT_DOWNLOADS: { 'default': 'False', 'type': bool, 'arg': '--print-downloads' },
|
||||||
PRINT_API_ERRORS: { 'default': 'False', 'type': bool, 'arg': '--print-api-errors' },
|
PRINT_API_ERRORS: { 'default': 'False', 'type': bool, 'arg': '--print-api-errors' },
|
||||||
|
PRINT_PROGRESS_INFO: { 'default': 'True', 'type': bool, 'arg': '--print-progress-info' },
|
||||||
|
PRINT_WARNINGS: { 'default': 'True', 'type': bool, 'arg': '--print-warnings' },
|
||||||
MD_ALLGENRES: { 'default': 'False', 'type': bool, 'arg': '--md-allgenres' },
|
MD_ALLGENRES: { 'default': 'False', 'type': bool, 'arg': '--md-allgenres' },
|
||||||
MD_GENREDELIMITER: { 'default': ';', 'type': str, 'arg': '--md-genredelimiter' },
|
MD_GENREDELIMITER: { 'default': ';', 'type': str, 'arg': '--md-genredelimiter' },
|
||||||
TEMP_DOWNLOAD_DIR: { 'default': '', 'type': str, 'arg': '--temp-download-dir' }
|
TEMP_DOWNLOAD_DIR: { 'default': '', 'type': str, 'arg': '--temp-download-dir' }
|
||||||
@ -154,7 +158,7 @@ class Config:
|
|||||||
return cls.get(SPLIT_ALBUM_DISCS)
|
return cls.get(SPLIT_ALBUM_DISCS)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_chunk_size(cls) -> int():
|
def get_chunk_size(cls) -> int:
|
||||||
return cls.get(CHUNK_SIZE)
|
return cls.get(CHUNK_SIZE)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
72
zspotify/loader.py
Normal file
72
zspotify/loader.py
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
# load symbol from:
|
||||||
|
# https://stackoverflow.com/questions/22029562/python-how-to-make-simple-animated-loading-while-process-is-running
|
||||||
|
|
||||||
|
# imports
|
||||||
|
from itertools import cycle
|
||||||
|
from shutil import get_terminal_size
|
||||||
|
from threading import Thread
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
from termoutput import Printer
|
||||||
|
|
||||||
|
|
||||||
|
class Loader:
|
||||||
|
"""Busy symbol.
|
||||||
|
|
||||||
|
Can be called inside a context:
|
||||||
|
|
||||||
|
with Loader("This take some Time..."):
|
||||||
|
# do something
|
||||||
|
pass
|
||||||
|
"""
|
||||||
|
def __init__(self, chan, desc="Loading...", end='', timeout=0.1, mode='std1'):
|
||||||
|
"""
|
||||||
|
A loader-like context manager
|
||||||
|
|
||||||
|
Args:
|
||||||
|
desc (str, optional): The loader's description. Defaults to "Loading...".
|
||||||
|
end (str, optional): Final print. Defaults to "".
|
||||||
|
timeout (float, optional): Sleep time between prints. Defaults to 0.1.
|
||||||
|
"""
|
||||||
|
self.desc = desc
|
||||||
|
self.end = end
|
||||||
|
self.timeout = timeout
|
||||||
|
self.channel = chan
|
||||||
|
|
||||||
|
self._thread = Thread(target=self._animate, daemon=True)
|
||||||
|
if mode == 'std1':
|
||||||
|
self.steps = ["⢿", "⣻", "⣽", "⣾", "⣷", "⣯", "⣟", "⡿"]
|
||||||
|
elif mode == 'std2':
|
||||||
|
self.steps = ["◜","◝","◞","◟"]
|
||||||
|
elif mode == 'std3':
|
||||||
|
self.steps = ["😐 ","😐 ","😮 ","😮 ","😦 ","😦 ","😧 ","😧 ","🤯 ","💥 ","✨ ","\u3000 ","\u3000 ","\u3000 "]
|
||||||
|
elif mode == 'prog':
|
||||||
|
self.steps = ["[∙∙∙]","[●∙∙]","[∙●∙]","[∙∙●]","[∙∙∙]"]
|
||||||
|
|
||||||
|
self.done = False
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self._thread.start()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def _animate(self):
|
||||||
|
for c in cycle(self.steps):
|
||||||
|
if self.done:
|
||||||
|
break
|
||||||
|
Printer.print_loader(self.channel, f"\r\t{c} {self.desc} ")
|
||||||
|
sleep(self.timeout)
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.start()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.done = True
|
||||||
|
cols = get_terminal_size((80, 20)).columns
|
||||||
|
Printer.print_loader(self.channel, "\r" + " " * cols)
|
||||||
|
|
||||||
|
if self.end != "":
|
||||||
|
Printer.print_loader(self.channel, f"\r{self.end}")
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_value, tb):
|
||||||
|
# handle exceptions with those variables ^
|
||||||
|
self.stop()
|
@ -1,7 +1,8 @@
|
|||||||
|
import sys
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
|
|
||||||
from config import PRINT_SPLASH, PRINT_SKIPS, PRINT_DOWNLOAD_PROGRESS, PRINT_ERRORS, PRINT_DOWNLOADS, PRINT_API_ERRORS
|
from config import *
|
||||||
from zspotify import ZSpotify
|
from zspotify import ZSpotify
|
||||||
|
|
||||||
|
|
||||||
@ -10,16 +11,29 @@ class PrintChannel(Enum):
|
|||||||
SKIPS = PRINT_SKIPS
|
SKIPS = PRINT_SKIPS
|
||||||
DOWNLOAD_PROGRESS = PRINT_DOWNLOAD_PROGRESS
|
DOWNLOAD_PROGRESS = PRINT_DOWNLOAD_PROGRESS
|
||||||
ERRORS = PRINT_ERRORS
|
ERRORS = PRINT_ERRORS
|
||||||
|
WARNINGS = PRINT_WARNINGS
|
||||||
DOWNLOADS = PRINT_DOWNLOADS
|
DOWNLOADS = PRINT_DOWNLOADS
|
||||||
API_ERRORS = PRINT_API_ERRORS
|
API_ERRORS = PRINT_API_ERRORS
|
||||||
|
PROGRESS_INFO = PRINT_PROGRESS_INFO
|
||||||
|
|
||||||
|
|
||||||
|
ERROR_CHANNEL = [PrintChannel.ERRORS, PrintChannel.API_ERRORS]
|
||||||
|
|
||||||
|
|
||||||
class Printer:
|
class Printer:
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def print(channel: PrintChannel, msg: str) -> None:
|
def print(channel: PrintChannel, msg: str) -> None:
|
||||||
if ZSpotify.CONFIG.get(channel.value):
|
if ZSpotify.CONFIG.get(channel.value):
|
||||||
|
if channel in ERROR_CHANNEL:
|
||||||
|
print(msg, file=sys.stderr)
|
||||||
|
else:
|
||||||
print(msg)
|
print(msg)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def print_loader(channel: PrintChannel, msg: str) -> None:
|
||||||
|
if ZSpotify.CONFIG.get(channel.value):
|
||||||
|
print(msg, flush=True, end="")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def progress(iterable=None, desc=None, total=None, unit='it', disable=False, unit_scale=False, unit_divisor=1000):
|
def progress(iterable=None, desc=None, total=None, unit='it', disable=False, unit_scale=False, unit_divisor=1000):
|
||||||
if not ZSpotify.CONFIG.get(PrintChannel.DOWNLOAD_PROGRESS.value):
|
if not ZSpotify.CONFIG.get(PrintChannel.DOWNLOAD_PROGRESS.value):
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
from threading import Thread
|
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
from typing import Any, Tuple, List
|
from typing import Any, Tuple, List
|
||||||
@ -9,7 +8,7 @@ from librespot.audio.decoders import AudioQuality
|
|||||||
from librespot.metadata import TrackId
|
from librespot.metadata import TrackId
|
||||||
from ffmpy import FFmpeg
|
from ffmpy import FFmpeg
|
||||||
|
|
||||||
from const import TRACKS, ALBUM, GENRES, GENRE, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
|
from const import TRACKS, ALBUM, GENRES, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
|
||||||
RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS
|
RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS
|
||||||
from termoutput import Printer, PrintChannel
|
from termoutput import Printer, PrintChannel
|
||||||
from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \
|
from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \
|
||||||
@ -17,7 +16,7 @@ from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_down
|
|||||||
from zspotify import ZSpotify
|
from zspotify import ZSpotify
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
from utils import Loader
|
from loader import Loader
|
||||||
|
|
||||||
def get_saved_tracks() -> list:
|
def get_saved_tracks() -> list:
|
||||||
""" Returns user's saved tracks """
|
""" Returns user's saved tracks """
|
||||||
@ -38,7 +37,7 @@ def get_saved_tracks() -> list:
|
|||||||
|
|
||||||
def get_song_info(song_id) -> Tuple[List[str], List[str], str, str, Any, Any, Any, Any, Any, Any, int]:
|
def get_song_info(song_id) -> Tuple[List[str], List[str], str, str, Any, Any, Any, Any, Any, Any, int]:
|
||||||
""" Retrieves metadata for downloaded songs """
|
""" Retrieves metadata for downloaded songs """
|
||||||
with Loader("Fetching track information..."):
|
with Loader(PrintChannel.PROGRESS_INFO, "Fetching track information..."):
|
||||||
(raw, info) = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token')
|
(raw, info) = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token')
|
||||||
|
|
||||||
if not TRACKS in info:
|
if not TRACKS in info:
|
||||||
@ -50,7 +49,7 @@ def get_song_info(song_id) -> Tuple[List[str], List[str], str, str, Any, Any, An
|
|||||||
for data in info[TRACKS][0][ARTISTS]:
|
for data in info[TRACKS][0][ARTISTS]:
|
||||||
artists.append(data[NAME])
|
artists.append(data[NAME])
|
||||||
# query artist genres via href, which will be the api url
|
# query artist genres via href, which will be the api url
|
||||||
with Loader("Fetching artist information..."):
|
with Loader(PrintChannel.PROGRESS_INFO, "Fetching artist information..."):
|
||||||
(raw, artistInfo) = ZSpotify.invoke_url(f'{data["href"]}')
|
(raw, artistInfo) = ZSpotify.invoke_url(f'{data["href"]}')
|
||||||
if ZSpotify.CONFIG.get_allGenres() and len(artistInfo[GENRES]) > 0:
|
if ZSpotify.CONFIG.get_allGenres() and len(artistInfo[GENRES]) > 0:
|
||||||
for genre in artistInfo[GENRES]:
|
for genre in artistInfo[GENRES]:
|
||||||
@ -59,7 +58,7 @@ def get_song_info(song_id) -> Tuple[List[str], List[str], str, str, Any, Any, An
|
|||||||
genres.append(artistInfo[GENRES][0])
|
genres.append(artistInfo[GENRES][0])
|
||||||
|
|
||||||
if len(genres) == 0:
|
if len(genres) == 0:
|
||||||
Printer.print(PrintChannel.SKIPS, '### No Genre found.')
|
Printer.print(PrintChannel.WARNINGS, '### No Genres found for song ' + info[TRACKS][0][NAME])
|
||||||
genres.append('')
|
genres.append('')
|
||||||
|
|
||||||
album_name = info[TRACKS][0][ALBUM][NAME]
|
album_name = info[TRACKS][0][ALBUM][NAME]
|
||||||
@ -93,19 +92,23 @@ def get_song_duration(song_id: str) -> float:
|
|||||||
|
|
||||||
return duration
|
return duration
|
||||||
|
|
||||||
|
|
||||||
# noinspection PyBroadException
|
# noinspection PyBroadException
|
||||||
def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=False) -> None:
|
def download_track(mode: str, track_id: str, extra_keys=None, disable_progressbar=False) -> None:
|
||||||
""" Downloads raw song audio from Spotify """
|
""" Downloads raw song audio from Spotify """
|
||||||
|
|
||||||
|
if extra_keys is None:
|
||||||
|
extra_keys = {}
|
||||||
|
|
||||||
|
prepare_download_loader = Loader(PrintChannel.PROGRESS_INFO, "Preparing download...")
|
||||||
|
prepare_download_loader.start()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
output_template = ZSpotify.CONFIG.get_output(mode)
|
output_template = ZSpotify.CONFIG.get_output(mode)
|
||||||
|
|
||||||
(artists, genres, album_name, name, image_url, release_year, disc_number,
|
(artists, genres, album_name, name, image_url, release_year, disc_number,
|
||||||
track_number, scraped_song_id, is_playable, duration_ms) = get_song_info(track_id)
|
track_number, scraped_song_id, is_playable, duration_ms) = get_song_info(track_id)
|
||||||
|
|
||||||
prepareDownloadLoader = Loader("Preparing download...");
|
|
||||||
prepareDownloadLoader.start()
|
|
||||||
|
|
||||||
song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name)
|
song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name)
|
||||||
|
|
||||||
for k in extra_keys:
|
for k in extra_keys:
|
||||||
@ -143,24 +146,27 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
|
|||||||
|
|
||||||
filename = os.path.join(filedir, f'{fname}_{c}{ext}')
|
filename = os.path.join(filedir, f'{fname}_{c}{ext}')
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
Printer.print(PrintChannel.ERRORS, '### SKIPPING SONG - FAILED TO QUERY METADATA ###')
|
Printer.print(PrintChannel.ERRORS, '### SKIPPING SONG - FAILED TO QUERY METADATA ###')
|
||||||
Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id) + "\n")
|
Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id))
|
||||||
|
for k in extra_keys:
|
||||||
|
Printer.print(PrintChannel.ERRORS, k + ': ' + str(extra_keys[k]))
|
||||||
|
Printer.print(PrintChannel.ERRORS, "\n")
|
||||||
Printer.print(PrintChannel.ERRORS, str(e) + "\n")
|
Printer.print(PrintChannel.ERRORS, str(e) + "\n")
|
||||||
Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
|
Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
if not is_playable:
|
if not is_playable:
|
||||||
prepareDownloadLoader.stop();
|
prepare_download_loader.stop()
|
||||||
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG IS UNAVAILABLE) ###' + "\n")
|
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG IS UNAVAILABLE) ###' + "\n")
|
||||||
else:
|
else:
|
||||||
if check_id and check_name and ZSpotify.CONFIG.get_skip_existing_files():
|
if check_id and check_name and ZSpotify.CONFIG.get_skip_existing_files():
|
||||||
prepareDownloadLoader.stop();
|
prepare_download_loader.stop()
|
||||||
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY EXISTS) ###' + "\n")
|
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY EXISTS) ###' + "\n")
|
||||||
|
|
||||||
elif check_all_time and ZSpotify.CONFIG.get_skip_previously_downloaded():
|
elif check_all_time and ZSpotify.CONFIG.get_skip_previously_downloaded():
|
||||||
prepareDownloadLoader.stop();
|
prepare_download_loader.stop()
|
||||||
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE) ###' + "\n")
|
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE) ###' + "\n")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
@ -171,7 +177,7 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
|
|||||||
create_download_directory(filedir)
|
create_download_directory(filedir)
|
||||||
total_size = stream.input_stream.size
|
total_size = stream.input_stream.size
|
||||||
|
|
||||||
prepareDownloadLoader.stop();
|
prepare_download_loader.stop()
|
||||||
|
|
||||||
time_start = time.time()
|
time_start = time.time()
|
||||||
downloaded = 0
|
downloaded = 0
|
||||||
@ -217,13 +223,18 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
|
|||||||
time.sleep(ZSpotify.CONFIG.get_anti_ban_wait_time())
|
time.sleep(ZSpotify.CONFIG.get_anti_ban_wait_time())
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
Printer.print(PrintChannel.ERRORS, '### SKIPPING: ' + song_name + ' (GENERAL DOWNLOAD ERROR) ###')
|
Printer.print(PrintChannel.ERRORS, '### SKIPPING: ' + song_name + ' (GENERAL DOWNLOAD ERROR) ###')
|
||||||
Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id) + "\n")
|
Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id))
|
||||||
|
for k in extra_keys:
|
||||||
|
Printer.print(PrintChannel.ERRORS, k + ': ' + str(extra_keys[k]))
|
||||||
|
Printer.print(PrintChannel.ERRORS, "\n")
|
||||||
Printer.print(PrintChannel.ERRORS, str(e) + "\n")
|
Printer.print(PrintChannel.ERRORS, str(e) + "\n")
|
||||||
Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
|
Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
|
||||||
if os.path.exists(filename_temp):
|
if os.path.exists(filename_temp):
|
||||||
os.remove(filename_temp)
|
os.remove(filename_temp)
|
||||||
|
|
||||||
prepareDownloadLoader.stop()
|
prepare_download_loader.stop()
|
||||||
|
|
||||||
|
|
||||||
def convert_audio_format(filename) -> None:
|
def convert_audio_format(filename) -> None:
|
||||||
""" Converts raw audio into playable file """
|
""" Converts raw audio into playable file """
|
||||||
temp_filename = f'{os.path.splitext(filename)[0]}.tmp'
|
temp_filename = f'{os.path.splitext(filename)[0]}.tmp'
|
||||||
@ -251,7 +262,7 @@ def convert_audio_format(filename) -> None:
|
|||||||
outputs={filename: output_params}
|
outputs={filename: output_params}
|
||||||
)
|
)
|
||||||
|
|
||||||
with Loader("Converting file..."):
|
with Loader(PrintChannel.PROGRESS_INFO, "Converting file..."):
|
||||||
ff_m.run()
|
ff_m.run()
|
||||||
|
|
||||||
if os.path.exists(temp_filename):
|
if os.path.exists(temp_filename):
|
||||||
|
@ -280,84 +280,3 @@ def fmt_seconds(secs: float) -> str:
|
|||||||
return f'{m}'.zfill(2) + ':' + f'{s}'.zfill(2)
|
return f'{m}'.zfill(2) + ':' + f'{s}'.zfill(2)
|
||||||
else:
|
else:
|
||||||
return f'{h}'.zfill(2) + ':' + f'{m}'.zfill(2) + ':' + f'{s}'.zfill(2)
|
return f'{h}'.zfill(2) + ':' + f'{m}'.zfill(2) + ':' + f'{s}'.zfill(2)
|
||||||
|
|
||||||
|
|
||||||
# load symbol from:
|
|
||||||
# https://stackoverflow.com/questions/22029562/python-how-to-make-simple-animated-loading-while-process-is-running
|
|
||||||
|
|
||||||
# imports
|
|
||||||
from itertools import cycle
|
|
||||||
from shutil import get_terminal_size
|
|
||||||
from threading import Thread
|
|
||||||
from time import sleep
|
|
||||||
|
|
||||||
class Loader:
|
|
||||||
"""Busy symbol.
|
|
||||||
|
|
||||||
Can be called inside a context:
|
|
||||||
|
|
||||||
with Loader("This take some Time..."):
|
|
||||||
# do something
|
|
||||||
pass
|
|
||||||
"""
|
|
||||||
def __init__(self, desc="Loading...", end='', timeout=0.1, mode='std1'):
|
|
||||||
"""
|
|
||||||
A loader-like context manager
|
|
||||||
|
|
||||||
Args:
|
|
||||||
desc (str, optional): The loader's description. Defaults to "Loading...".
|
|
||||||
end (str, optional): Final print. Defaults to "".
|
|
||||||
timeout (float, optional): Sleep time between prints. Defaults to 0.1.
|
|
||||||
"""
|
|
||||||
self.desc = desc
|
|
||||||
self.end = end
|
|
||||||
self.timeout = timeout
|
|
||||||
|
|
||||||
self._thread = Thread(target=self._animate, daemon=True)
|
|
||||||
if mode == 'std1':
|
|
||||||
self.steps = ["⢿", "⣻", "⣽", "⣾", "⣷", "⣯", "⣟", "⡿"]
|
|
||||||
elif mode == 'std2':
|
|
||||||
self.steps = ["◜","◝","◞","◟"]
|
|
||||||
elif mode == 'std3':
|
|
||||||
self.steps = ["😐 ","😐 ","😮 ","😮 ","😦 ","😦 ","😧 ","😧 ","🤯 ","💥 ","✨ ","\u3000 ","\u3000 ","\u3000 "]
|
|
||||||
elif mode == 'prog':
|
|
||||||
self.steps = ["[∙∙∙]","[●∙∙]","[∙●∙]","[∙∙●]","[∙∙∙]"]
|
|
||||||
|
|
||||||
self.done = False
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
self._thread.start()
|
|
||||||
return self
|
|
||||||
|
|
||||||
def _animate(self):
|
|
||||||
for c in cycle(self.steps):
|
|
||||||
if self.done:
|
|
||||||
break
|
|
||||||
print(f"\r\t{c} {self.desc} ", flush=True, end="")
|
|
||||||
sleep(self.timeout)
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
self.start()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self.done = True
|
|
||||||
cols = get_terminal_size((80, 20)).columns
|
|
||||||
print("\r" + " " * cols, end="", flush=True)
|
|
||||||
|
|
||||||
if self.end != "":
|
|
||||||
print(f"\r{self.end}", flush=True)
|
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_value, tb):
|
|
||||||
# handle exceptions with those variables ^
|
|
||||||
self.stop()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
with Loader("Loading with context manager..."):
|
|
||||||
for i in range(10):
|
|
||||||
sleep(0.25)
|
|
||||||
|
|
||||||
loader = Loader("Loading with object...", "That was fast!", 0.05).start()
|
|
||||||
for i in range(10):
|
|
||||||
sleep(0.25)
|
|
||||||
loader.stop()
|
|
||||||
|
@ -86,16 +86,18 @@ class ZSpotify:
|
|||||||
from termoutput import Printer, PrintChannel
|
from termoutput import Printer, PrintChannel
|
||||||
headers = cls.get_auth_header()
|
headers = cls.get_auth_header()
|
||||||
response = requests.get(url, headers=headers)
|
response = requests.get(url, headers=headers)
|
||||||
responseText = response.text
|
responsetext = response.text
|
||||||
responseJson = response.json()
|
responsejson = response.json()
|
||||||
|
|
||||||
if 'error' in responseJson and tryCount < 5:
|
if 'error' in responsejson:
|
||||||
|
if tryCount < 5:
|
||||||
Printer.Print(PrintChannel.API_ERROR, f"Spotify API Error ({responseJson['error']['status']}): {responseJson['error']['message']}")
|
Printer.print(PrintChannel.WARNINGS, f"Spotify API Error (try {tryCount}) ({responsejson['error']['status']}): {responsejson['error']['message']}")
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
return cls.invoke_url(url, tryCount + 1)
|
return cls.invoke_url(url, tryCount + 1)
|
||||||
|
|
||||||
return responseText, responseJson
|
Printer.print(PrintChannel.API_ERRORS, f"Spotify API Error ({responsejson['error']['status']}): {responsejson['error']['message']}")
|
||||||
|
|
||||||
|
return responsetext, responsejson
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def check_premium(cls) -> bool:
|
def check_premium(cls) -> bool:
|
||||||
|
Loading…
Reference in New Issue
Block a user