diff --git a/zspotify/__main__.py b/zspotify/__main__.py index 0311fc2f..b06ade3c 100644 --- a/zspotify/__main__.py +++ b/zspotify/__main__.py @@ -1,8 +1,7 @@ import argparse from app import client - - +from config import CONFIG_VALUES if __name__ == '__main__': parser = argparse.ArgumentParser(prog='zspotify', @@ -10,6 +9,9 @@ if __name__ == '__main__': parser.add_argument('-ns', '--no-splash', action='store_true', help='Suppress the splash screen when loading.') + parser.add_argument('--config-location', + type=str, + help='Specify the zs_config.json location') group = parser.add_mutually_exclusive_group(required=True) group.add_argument('urls', type=str, @@ -32,6 +34,12 @@ if __name__ == '__main__': type=str, help='Downloads tracks, playlists and albums from the URLs written in the file passed.') + for configkey in CONFIG_VALUES: + parser.add_argument(CONFIG_VALUES[configkey]['arg'], + type=str, + default=None, + help='Specify the value of the ['+configkey+'] config value') + parser.set_defaults(func=client) args = parser.parse_args() diff --git a/zspotify/album.py b/zspotify/album.py index 5325caa9..8aea1199 100644 --- a/zspotify/album.py +++ b/zspotify/album.py @@ -1,6 +1,5 @@ -from tqdm import tqdm - from const import ITEMS, ARTISTS, NAME, ID +from termoutput import Printer from track import download_track from utils import fix_filename from zspotify import ZSpotify @@ -47,12 +46,9 @@ def get_artist_albums(artist_id): def download_album(album): """ Downloads songs from an album """ artist, album_name = get_album_name(album) - artist_fixed = fix_filename(artist) - album_name_fixed = fix_filename(album_name) tracks = get_album_tracks(album) - for n, track in tqdm(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)): - download_track(track[ID], f'{artist_fixed}/{album_name_fixed}', - prefix=True, prefix_value=str(n), disable_progressbar=True) + for n, track in Printer.progress(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)): + download_track('album', track[ID], extra_keys={'album_num': str(n).zfill(2), 'artist': artist, 'album': album_name, 'album_id': album}, disable_progressbar=True) def download_artist_albums(artist): diff --git a/zspotify/app.py b/zspotify/app.py index 2d315094..75be3156 100644 --- a/zspotify/app.py +++ b/zspotify/app.py @@ -7,6 +7,7 @@ from const import TRACK, NAME, ID, ARTIST, ARTISTS, ITEMS, TRACKS, EXPLICIT, ALB OWNER, PLAYLIST, PLAYLISTS, DISPLAY_NAME from playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist, download_playlist from podcast import download_episode, get_show_episodes +from termoutput import Printer, PrintChannel from track import download_track, get_saved_tracks from utils import fix_filename, splash, split_input, regex_input_for_urls from zspotify import ZSpotify @@ -16,18 +17,15 @@ SEARCH_URL = 'https://api.spotify.com/v1/search' def client(args) -> None: """ Connects to spotify to perform query's and get songs to download """ - ZSpotify() + ZSpotify(args) - if not args.no_splash: - splash() + Printer.print(PrintChannel.SPLASH, splash()) if ZSpotify.check_premium(): - if not args.no_splash: - print('[ DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ]\n\n') + Printer.print(PrintChannel.SPLASH, '[ DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ]\n\n') ZSpotify.DOWNLOAD_QUALITY = AudioQuality.VERY_HIGH else: - if not args.no_splash: - print('[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n') + Printer.print(PrintChannel.SPLASH, '[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n') ZSpotify.DOWNLOAD_QUALITY = AudioQuality.HIGH if args.download: @@ -40,7 +38,7 @@ def client(args) -> None: download_from_urls(urls) else: - print(f'File {filename} not found.\n') + Printer.print(PrintChannel.ERRORS, f'File {filename} not found.\n') if args.urls: download_from_urls(args.urls) @@ -51,11 +49,9 @@ def client(args) -> None: if args.liked_songs: for song in get_saved_tracks(): if not song[TRACK][NAME]: - print( - '### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###') + Printer.print(PrintChannel.ERRORS, '### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###' + "\n") else: - download_track(song[TRACK][ID], 'Liked Songs/') - print('\n') + download_track('liked', song[TRACK][ID]) if args.search_spotify: search_text = '' @@ -75,7 +71,7 @@ def download_from_urls(urls: list[str]) -> bool: if track_id is not None: download = True - download_track(track_id) + download_track('single', track_id) elif artist_id is not None: download = True download_artist_albums(artist_id) @@ -87,9 +83,7 @@ def download_from_urls(urls: list[str]) -> bool: playlist_songs = get_playlist_songs(playlist_id) name, _ = get_playlist_info(playlist_id) for song in playlist_songs: - download_track(song[TRACK][ID], - fix_filename(name) + '/') - print('\n') + download_track('playlist', song[TRACK][ID], extra_keys={'playlist': name}) elif episode_id is not None: download = True download_episode(episode_id) @@ -273,7 +267,7 @@ def search(search_term): print_pos = dics.index(dic) + 1 if print_pos == position: if dic['type'] == TRACK: - download_track(dic[ID]) + download_track('single', dic[ID]) elif dic['type'] == ALBUM: download_album(dic[ID]) elif dic['type'] == ARTIST: diff --git a/zspotify/config.py b/zspotify/config.py new file mode 100644 index 00000000..79f79638 --- /dev/null +++ b/zspotify/config.py @@ -0,0 +1,220 @@ +import json +import os +import sys +from typing import Any +from enum import Enum + +CONFIG_FILE_PATH = '../zs_config.json' + +ROOT_PATH = 'ROOT_PATH' +ROOT_PODCAST_PATH = 'ROOT_PODCAST_PATH' +SKIP_EXISTING_FILES = 'SKIP_EXISTING_FILES' +SKIP_PREVIOUSLY_DOWNLOADED = 'SKIP_PREVIOUSLY_DOWNLOADED' +DOWNLOAD_FORMAT = 'DOWNLOAD_FORMAT' +FORCE_PREMIUM = 'FORCE_PREMIUM' +ANTI_BAN_WAIT_TIME = 'ANTI_BAN_WAIT_TIME' +OVERRIDE_AUTO_WAIT = 'OVERRIDE_AUTO_WAIT' +CHUNK_SIZE = 'CHUNK_SIZE' +SPLIT_ALBUM_DISCS = 'SPLIT_ALBUM_DISCS' +DOWNLOAD_REAL_TIME = 'DOWNLOAD_REAL_TIME' +LANGUAGE = 'LANGUAGE' +BITRATE = 'BITRATE' +SONG_ARCHIVE = 'SONG_ARCHIVE' +CREDENTIALS_LOCATION = 'CREDENTIALS_LOCATION' +OUTPUT = 'OUTPUT' +PRINT_SPLASH = 'PRINT_SPLASH' +PRINT_SKIPS = 'PRINT_SKIPS' +PRINT_DOWNLOAD_PROGRESS = 'PRINT_DOWNLOAD_PROGRESS' +PRINT_ERRORS = 'PRINT_ERRORS' +PRINT_DOWNLOADS = 'PRINT_DOWNLOADS' + +CONFIG_VALUES = { + ROOT_PATH: { 'default': '../ZSpotify Music/', 'type': str, 'arg': '--root-path' }, + ROOT_PODCAST_PATH: { 'default': '../ZSpotify Podcasts/', 'type': str, 'arg': '--root-podcast-path' }, + SKIP_EXISTING_FILES: { 'default': 'True', 'type': bool, 'arg': '--skip-existing-files' }, + SKIP_PREVIOUSLY_DOWNLOADED: { 'default': 'False', 'type': bool, 'arg': '--skip-previously-downloaded' }, + DOWNLOAD_FORMAT: { 'default': 'ogg', 'type': str, 'arg': '--download-format' }, + FORCE_PREMIUM: { 'default': 'False', 'type': bool, 'arg': '--force-premium' }, + ANTI_BAN_WAIT_TIME: { 'default': '1', 'type': int, 'arg': '--anti-ban-wait-time' }, + OVERRIDE_AUTO_WAIT: { 'default': 'False', 'type': bool, 'arg': '--override-auto-wait' }, + CHUNK_SIZE: { 'default': '50000', 'type': int, 'arg': '--chunk-size' }, + SPLIT_ALBUM_DISCS: { 'default': 'False', 'type': bool, 'arg': '--split-album-discs' }, + DOWNLOAD_REAL_TIME: { 'default': 'False', 'type': bool, 'arg': '--download-real-time' }, + LANGUAGE: { 'default': 'en', 'type': str, 'arg': '--language' }, + BITRATE: { 'default': '', 'type': str, 'arg': '--bitrate' }, + SONG_ARCHIVE: { 'default': '.song_archive', 'type': str, 'arg': '--song-archive' }, + CREDENTIALS_LOCATION: { 'default': 'credentials.json', 'type': str, 'arg': '--credentials-location' }, + OUTPUT: { 'default': '', 'type': str, 'arg': '--output' }, + PRINT_SPLASH: { 'default': 'True', 'type': bool, 'arg': '--print-splash' }, + PRINT_SKIPS: { 'default': 'True', 'type': bool, 'arg': '--print-skips' }, + PRINT_DOWNLOAD_PROGRESS: { 'default': 'True', 'type': bool, 'arg': '--print-download-progress' }, + PRINT_ERRORS: { 'default': 'True', 'type': bool, 'arg': '--print-errors' }, + PRINT_DOWNLOADS: { 'default': 'False', 'type': bool, 'arg': '--print-downloads' }, +} + +OUTPUT_DEFAULT_PLAYLIST = '{playlist}/{artist} - {song_name}.{ext}' +OUTPUT_DEFAULT_PLAYLIST_EXT = '{playlist}/{playlist_num} - {artist} - {song_name}.{ext}' +OUTPUT_DEFAULT_LIKED_SONGS = 'Liked Songs/{artist} - {song_name}.{ext}' +OUTPUT_DEFAULT_SINGLE = '{artist} - {song_name}.{ext}' +OUTPUT_DEFAULT_ALBUM = '{artist}/{album}/{album_num} - {artist} - {song_name}.{ext}' + +class Config: + Values = {} + + @classmethod + def load(cls, args) -> None: + app_dir = os.path.dirname(__file__) + + config_fp = CONFIG_FILE_PATH + if args.config_location: + config_fp = args.config_location + + true_config_file_path = os.path.join(app_dir, config_fp) + + # Load config from zs_config.json + + if not os.path.exists(true_config_file_path): + with open(true_config_file_path, 'w', encoding='utf-8') as config_file: + json.dump(cls.get_default_json(), config_file, indent=4) + cls.Values = cls.get_default_json() + else: + with open(true_config_file_path, encoding='utf-8') as config_file: + jsonvalues = json.load(config_file) + cls.Values = {} + for key in CONFIG_VALUES: + if key in jsonvalues: + cls.Values[key] = cls.parse_arg_value(key, jsonvalues[key]) + + # Add default values for missing keys + + for key in CONFIG_VALUES: + if key not in cls.Values: + cls.Values[key] = cls.parse_arg_value(key, CONFIG_VALUES[key]['default']) + + # Override config from commandline arguments + + for key in CONFIG_VALUES: + if key.lower() in vars(args) and vars(args)[key.lower()] is not None: + cls.Values[key] = cls.parse_arg_value(key, vars(args)[key.lower()]) + + if args.no_splash: + cls.Values[PRINT_SPLASH] = False + + @classmethod + def get_default_json(cls) -> Any: + r = {} + for key in CONFIG_VALUES: + r[key] = CONFIG_VALUES[key]['default'] + return r + + @classmethod + def parse_arg_value(cls, key: str, value: Any) -> Any: + if type(value) == CONFIG_VALUES[key]['type']: + return value + if CONFIG_VALUES[key]['type'] == str: + return str(value) + if CONFIG_VALUES[key]['type'] == int: + return int(value) + if CONFIG_VALUES[key]['type'] == bool: + if str(value).lower() in ['yes', 'true', '1']: + return True + if str(value).lower() in ['no', 'false', '0']: + return False + raise ValueError("Not a boolean: " + value) + raise ValueError("Unknown Type: " + value) + + @classmethod + def get(cls, key: str) -> Any: + return cls.Values.get(key) + + @classmethod + def get_root_path(cls) -> str: + return cls.get(ROOT_PATH) + + @classmethod + def get_root_podcast_path(cls) -> str: + return cls.get(ROOT_PODCAST_PATH) + + @classmethod + def get_skip_existing_files(cls) -> bool: + return cls.get(SKIP_EXISTING_FILES) + + @classmethod + def get_skip_previously_downloaded(cls) -> bool: + return cls.get(SKIP_PREVIOUSLY_DOWNLOADED) + + @classmethod + def get_split_album_discs(cls) -> bool: + return cls.get(SPLIT_ALBUM_DISCS) + + @classmethod + def get_chunk_size(cls) -> int(): + return cls.get(CHUNK_SIZE) + + @classmethod + def get_override_auto_wait(cls) -> bool: + return cls.get(OVERRIDE_AUTO_WAIT) + + @classmethod + def get_force_premium(cls) -> bool: + return cls.get(FORCE_PREMIUM) + + @classmethod + def get_download_format(cls) -> str: + return cls.get(DOWNLOAD_FORMAT) + + @classmethod + def get_anti_ban_wait_time(cls) -> int: + return cls.get(ANTI_BAN_WAIT_TIME) + + @classmethod + def get_language(cls) -> str: + return cls.get(LANGUAGE) + + @classmethod + def get_download_real_time(cls) -> bool: + return cls.get(DOWNLOAD_REAL_TIME) + + @classmethod + def get_bitrate(cls) -> str: + return cls.get(BITRATE) + + @classmethod + def get_song_archive(cls) -> str: + return cls.get(SONG_ARCHIVE) + + @classmethod + def get_credentials_location(cls) -> str: + return cls.get(CREDENTIALS_LOCATION) + + @classmethod + def get_output(cls, mode: str) -> str: + v = cls.get(OUTPUT) + if v: + return v + if mode == 'playlist': + if cls.get_split_album_discs(): + split = os.path.split(OUTPUT_DEFAULT_PLAYLIST) + return os.path.join(split[0], 'Disc {disc_number}', split[0]) + return OUTPUT_DEFAULT_PLAYLIST + if mode == 'extplaylist': + if cls.get_split_album_discs(): + split = os.path.split(OUTPUT_DEFAULT_PLAYLIST_EXT) + return os.path.join(split[0], 'Disc {disc_number}', split[0]) + return OUTPUT_DEFAULT_PLAYLIST_EXT + if mode == 'liked': + if cls.get_split_album_discs(): + split = os.path.split(OUTPUT_DEFAULT_LIKED_SONGS) + return os.path.join(split[0], 'Disc {disc_number}', split[0]) + return OUTPUT_DEFAULT_LIKED_SONGS + if mode == 'single': + if cls.get_split_album_discs(): + split = os.path.split(OUTPUT_DEFAULT_SINGLE) + return os.path.join(split[0], 'Disc {disc_number}', split[0]) + return OUTPUT_DEFAULT_SINGLE + if mode == 'album': + if cls.get_split_album_discs(): + split = os.path.split(OUTPUT_DEFAULT_ALBUM) + return os.path.join(split[0], 'Disc {disc_number}', split[0]) + return OUTPUT_DEFAULT_ALBUM + raise ValueError() diff --git a/zspotify/const.py b/zspotify/const.py index 06e800f6..057921c1 100644 --- a/zspotify/const.py +++ b/zspotify/const.py @@ -80,34 +80,6 @@ USER_LIBRARY_READ = 'user-library-read' WINDOWS_SYSTEM = 'Windows' -CREDENTIALS_JSON = 'credentials.json' - -CONFIG_FILE_PATH = '../zs_config.json' - -ROOT_PATH = 'ROOT_PATH' - -ROOT_PODCAST_PATH = 'ROOT_PODCAST_PATH' - -SKIP_EXISTING_FILES = 'SKIP_EXISTING_FILES' - -SKIP_PREVIOUSLY_DOWNLOADED = 'SKIP_PREVIOUSLY_DOWNLOADED' - -DOWNLOAD_FORMAT = 'DOWNLOAD_FORMAT' - -FORCE_PREMIUM = 'FORCE_PREMIUM' - -ANTI_BAN_WAIT_TIME = 'ANTI_BAN_WAIT_TIME' - -OVERRIDE_AUTO_WAIT = 'OVERRIDE_AUTO_WAIT' - -CHUNK_SIZE = 'CHUNK_SIZE' - -SPLIT_ALBUM_DISCS = 'SPLIT_ALBUM_DISCS' - -DOWNLOAD_REAL_TIME = 'DOWNLOAD_REAL_TIME' - -BITRATE = 'BITRATE' - CODEC_MAP = { 'aac': 'aac', 'fdk_aac': 'libfdk_aac', @@ -127,18 +99,3 @@ EXT_MAP = { 'opus': 'ogg', 'vorbis': 'ogg', } - -CONFIG_DEFAULT_SETTINGS = { - 'ROOT_PATH': '../ZSpotify Music/', - 'ROOT_PODCAST_PATH': '../ZSpotify Podcasts/', - 'SKIP_EXISTING_FILES': True, - 'SKIP_PREVIOUSLY_DOWNLOADED': False, - 'DOWNLOAD_FORMAT': 'ogg', - 'FORCE_PREMIUM': False, - 'ANTI_BAN_WAIT_TIME': 1, - 'OVERRIDE_AUTO_WAIT': False, - 'CHUNK_SIZE': 50000, - 'SPLIT_ALBUM_DISCS': False, - 'DOWNLOAD_REAL_TIME': False, - 'LANGUAGE': 'en' -} diff --git a/zspotify/playlist.py b/zspotify/playlist.py index 4a2b8916..67ddb336 100644 --- a/zspotify/playlist.py +++ b/zspotify/playlist.py @@ -1,6 +1,5 @@ -from tqdm import tqdm - from const import ITEMS, ID, TRACK, NAME +from termoutput import Printer from track import download_track from utils import fix_filename, split_input from zspotify import ZSpotify @@ -51,11 +50,10 @@ def download_playlist(playlist): """Downloads all the songs from a playlist""" playlist_songs = [song for song in get_playlist_songs(playlist[ID]) if song[TRACK][ID]] - p_bar = tqdm(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True) + p_bar = Printer.progress(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True) enum = 1 for song in p_bar: - download_track(song[TRACK][ID], fix_filename(playlist[NAME].strip()) + '/', - prefix=True, prefix_value=str(enum) ,disable_progressbar=True) + download_track('extplaylist', song[TRACK][ID], extra_keys={'playlist': playlist[NAME], 'playlist_num': str(enum).zfill(2)}, disable_progressbar=True) p_bar.set_description(song[TRACK][NAME]) enum += 1 diff --git a/zspotify/podcast.py b/zspotify/podcast.py index 265bff7c..88cd4318 100644 --- a/zspotify/podcast.py +++ b/zspotify/podcast.py @@ -3,10 +3,9 @@ from typing import Optional, Tuple from librespot.audio.decoders import VorbisOnlyAudioQuality from librespot.metadata import EpisodeId -from tqdm import tqdm -from const import (CHUNK_SIZE, ERROR, ID, ITEMS, NAME, ROOT_PODCAST_PATH, SHOW, - SKIP_EXISTING_FILES) +from const import (ERROR, ID, ITEMS, NAME, SHOW) +from termoutput import PrintChannel, Printer from utils import create_download_directory, fix_filename from zspotify import ZSpotify @@ -71,7 +70,7 @@ def download_episode(episode_id) -> None: extra_paths = podcast_name + '/' if podcast_name is None: - print('### SKIPPING: (EPISODE NOT FOUND) ###') + Printer.print(PrintChannel.ERRORS, '### SKIPPING: (EPISODE NOT FOUND) ###') else: filename = podcast_name + ' - ' + episode_name @@ -80,7 +79,7 @@ def download_episode(episode_id) -> None: download_directory = os.path.join( os.path.dirname(__file__), - ZSpotify.get_config(ROOT_PODCAST_PATH), + ZSpotify.CONFIG.get_root_podcast_path(), extra_paths, ) download_directory = os.path.realpath(download_directory) @@ -97,27 +96,21 @@ def download_episode(episode_id) -> None: if ( os.path.isfile(filepath) and os.path.getsize(filepath) == total_size - and ZSpotify.get_config(SKIP_EXISTING_FILES) + and ZSpotify.CONFIG.get_skip_existing_files() ): - print( - "\n### SKIPPING:", - podcast_name, - "-", - episode_name, - "(EPISODE ALREADY EXISTS) ###", - ) + Printer.print(PrintChannel.SKIPS, "\n### SKIPPING: " + podcast_name + " - " + episode_name + " (EPISODE ALREADY EXISTS) ###") return - with open(filepath, 'wb') as file, tqdm( + with open(filepath, 'wb') as file, Printer.progress( desc=filename, total=total_size, unit='B', unit_scale=True, unit_divisor=1024 ) as bar: - for _ in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1): + for _ in range(int(total_size / ZSpotify.CONFIG.get_chunk_size()) + 1): bar.update(file.write( - stream.input_stream.stream().read(ZSpotify.get_config(CHUNK_SIZE)))) + stream.input_stream.stream().read(ZSpotify.CONFIG.get_chunk_size()))) else: filepath = os.path.join(download_directory, f"{filename}.mp3") download_podcast_directly(direct_download_url, filepath) diff --git a/zspotify/termoutput.py b/zspotify/termoutput.py new file mode 100644 index 00000000..bf57e356 --- /dev/null +++ b/zspotify/termoutput.py @@ -0,0 +1,26 @@ +from enum import Enum +from tqdm import tqdm + +from config import PRINT_SPLASH, PRINT_SKIPS, PRINT_DOWNLOAD_PROGRESS, PRINT_ERRORS, PRINT_DOWNLOADS +from zspotify import ZSpotify + + +class PrintChannel(Enum): + SPLASH = PRINT_SPLASH + SKIPS = PRINT_SKIPS + DOWNLOAD_PROGRESS = PRINT_DOWNLOAD_PROGRESS + ERRORS = PRINT_ERRORS + DOWNLOADS = PRINT_DOWNLOADS + + +class Printer: + @staticmethod + def print(channel: PrintChannel, msg: str) -> None: + if ZSpotify.CONFIG.get(channel.value): + print(msg) + + @staticmethod + def progress(iterable=None, desc=None, total=None, unit='it', disable=False, unit_scale=False, unit_divisor=1000): + if not ZSpotify.CONFIG.get(PrintChannel.DOWNLOAD_PROGRESS.value): + disable = True + return tqdm(iterable=iterable, desc=desc, total=total, disable=disable, unit=unit, unit_scale=unit_scale, unit_divisor=unit_divisor) diff --git a/zspotify/track.py b/zspotify/track.py index a80ad92e..8f4ef9de 100644 --- a/zspotify/track.py +++ b/zspotify/track.py @@ -7,12 +7,10 @@ from librespot.audio.decoders import AudioQuality from librespot.metadata import TrackId from ffmpy import FFmpeg from pydub import AudioSegment -from tqdm import tqdm from const import TRACKS, ALBUM, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \ - RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, SPLIT_ALBUM_DISCS, ROOT_PATH, DOWNLOAD_FORMAT, \ - CHUNK_SIZE, SKIP_EXISTING_FILES, ANTI_BAN_WAIT_TIME, OVERRIDE_AUTO_WAIT, BITRATE, CODEC_MAP, EXT_MAP, DOWNLOAD_REAL_TIME, \ - SKIP_PREVIOUSLY_DOWNLOADED, DURATION_MS + RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS +from termoutput import Printer, PrintChannel from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \ get_directory_song_ids, add_to_directory_song_ids, get_previously_downloaded, add_to_archive from zspotify import ZSpotify @@ -71,58 +69,60 @@ def get_song_duration(song_id: str) -> float: return duration # noinspection PyBroadException -def download_track(track_id: str, extra_paths='', prefix=False, prefix_value='', disable_progressbar=False) -> None: +def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=False) -> None: """ Downloads raw song audio from Spotify """ try: + output_template = ZSpotify.CONFIG.get_output(mode) + (artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable, duration_ms) = get_song_info(track_id) - if ZSpotify.get_config(SPLIT_ALBUM_DISCS): - download_directory = os.path.join(os.path.dirname( - __file__), ZSpotify.get_config(ROOT_PATH), extra_paths, f'Disc {disc_number}') - else: - download_directory = os.path.join(os.path.dirname( - __file__), ZSpotify.get_config(ROOT_PATH), extra_paths) - song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name) - if prefix: - song_name = f'{prefix_value.zfill(2)} - {song_name}' if prefix_value.isdigit( - ) else f'{prefix_value} - {song_name}' - filename = os.path.join( - download_directory, f'{song_name}.{EXT_MAP.get(ZSpotify.get_config(DOWNLOAD_FORMAT).lower())}') + for k in extra_keys: + output_template = output_template.replace("{"+k+"}", fix_filename(extra_keys[k])) + + output_template = output_template.replace("{artist}", fix_filename(artists[0])) + output_template = output_template.replace("{album}", fix_filename(album_name)) + output_template = output_template.replace("{song_name}", fix_filename(name)) + output_template = output_template.replace("{release_year}", fix_filename(release_year)) + output_template = output_template.replace("{disc_number}", fix_filename(disc_number)) + output_template = output_template.replace("{track_number}", fix_filename(track_number)) + output_template = output_template.replace("{id}", fix_filename(scraped_song_id)) + output_template = output_template.replace("{track_id}", fix_filename(track_id)) + output_template = output_template.replace("{ext}", EXT_MAP.get(ZSpotify.CONFIG.get_download_format().lower())) + + filename = os.path.join(os.path.dirname(__file__), ZSpotify.CONFIG.get_root_path(), output_template) + filedir = os.path.dirname(filename) - archive_directory = os.path.join(os.path.dirname(__file__), ZSpotify.get_config(ROOT_PATH)) check_name = os.path.isfile(filename) and os.path.getsize(filename) - check_id = scraped_song_id in get_directory_song_ids(download_directory) - check_all_time = scraped_song_id in get_previously_downloaded(scraped_song_id, archive_directory) + check_id = scraped_song_id in get_directory_song_ids(filedir) + check_all_time = scraped_song_id in get_previously_downloaded() # a song with the same name is installed if not check_id and check_name: - c = len([file for file in os.listdir(download_directory) - if re.search(f'^{song_name}_', file)]) + 1 + c = len([file for file in os.listdir(filedir) if re.search(f'^{filename}_', str(file))]) + 1 - filename = os.path.join( - download_directory, f'{song_name}_{c}.{EXT_MAP.get(ZSpotify.get_config(DOWNLOAD_FORMAT))}') + fname = os.path.splitext(os.path.basename(filename))[0] + ext = os.path.splitext(os.path.basename(filename))[1] + + filename = os.path.join(filedir, f'{fname}_{c}{ext}') except Exception as e: - print('### SKIPPING SONG - FAILED TO QUERY METADATA ###') - print(e) + Printer.print(PrintChannel.ERRORS, '### SKIPPING SONG - FAILED TO QUERY METADATA ###') + Printer.print(PrintChannel.ERRORS, str(e) + "\n") else: try: if not is_playable: - print('\n### SKIPPING:', song_name, - '(SONG IS UNAVAILABLE) ###') + Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG IS UNAVAILABLE) ###' + "\n") else: - if check_id and check_name and ZSpotify.get_config(SKIP_EXISTING_FILES): - print('\n### SKIPPING:', song_name, - '(SONG ALREADY EXISTS) ###') + if check_id and check_name and ZSpotify.CONFIG.get_skip_existing_files(): + Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY EXISTS) ###' + "\n") - elif check_all_time and ZSpotify.get_config(SKIP_PREVIOUSLY_DOWNLOADED): - print('\n### SKIPPING:', song_name, - '(SONG ALREADY DOWNLOADED ONCE) ###') + elif check_all_time and ZSpotify.CONFIG.get_skip_previously_downloaded(): + Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE) ###' + "\n") else: if track_id != scraped_song_id: @@ -130,10 +130,10 @@ def download_track(track_id: str, extra_paths='', prefix=False, prefix_value='', track_id = TrackId.from_base62(track_id) stream = ZSpotify.get_content_stream( track_id, ZSpotify.DOWNLOAD_QUALITY) - create_download_directory(download_directory) + create_download_directory(filedir) total_size = stream.input_stream.size - with open(filename, 'wb') as file, tqdm( + with open(filename, 'wb') as file, Printer.progress( desc=song_name, total=total_size, unit='B', @@ -141,31 +141,31 @@ def download_track(track_id: str, extra_paths='', prefix=False, prefix_value='', unit_divisor=1024, disable=disable_progressbar ) as p_bar: - pause = duration_ms / ZSpotify.get_config(CHUNK_SIZE) - for chunk in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1): - data = stream.input_stream.stream().read(ZSpotify.get_config(CHUNK_SIZE)) + pause = duration_ms / ZSpotify.CONFIG.get_chunk_size() + for chunk in range(int(total_size / ZSpotify.CONFIG.get_chunk_size()) + 1): + data = stream.input_stream.stream().read(ZSpotify.CONFIG.get_chunk_size()) p_bar.update(file.write(data)) - if ZSpotify.get_config(DOWNLOAD_REAL_TIME): + if ZSpotify.CONFIG.get_download_real_time(): time.sleep(pause) convert_audio_format(filename) - set_audio_tags(filename, artists, name, album_name, - release_year, disc_number, track_number) + set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number) set_music_thumbnail(filename, image_url) + Printer.print(PrintChannel.DOWNLOADS, f'### Downloaded "{song_name}" to "{os.path.relpath(filename, os.path.dirname(__file__))}" ###' + "\n") + # add song id to archive file - if ZSpotify.get_config(SKIP_PREVIOUSLY_DOWNLOADED): - add_to_archive(scraped_song_id, archive_directory) + if ZSpotify.CONFIG.get_skip_previously_downloaded(): + add_to_archive(scraped_song_id, os.path.basename(filename), artists[0], name) # add song id to download directory's .song_ids file if not check_id: - add_to_directory_song_ids(download_directory, scraped_song_id) + add_to_directory_song_ids(filedir, scraped_song_id, os.path.basename(filename), artists[0], name) - if not ZSpotify.get_config(OVERRIDE_AUTO_WAIT): - time.sleep(ZSpotify.get_config(ANTI_BAN_WAIT_TIME)) + if not ZSpotify.CONFIG.get_anti_ban_wait_time(): + time.sleep(ZSpotify.CONFIG.get_anti_ban_wait_time()) except Exception as e: - print('### SKIPPING:', song_name, - '(GENERAL DOWNLOAD ERROR) ###') - print(e) + Printer.print(PrintChannel.ERRORS, '### SKIPPING: ' + song_name + ' (GENERAL DOWNLOAD ERROR) ###') + Printer.print(PrintChannel.ERRORS, str(e) + "\n") if os.path.exists(filename): os.remove(filename) @@ -175,10 +175,10 @@ def convert_audio_format(filename) -> None: temp_filename = f'{os.path.splitext(filename)[0]}.tmp' os.replace(filename, temp_filename) - download_format = ZSpotify.get_config(DOWNLOAD_FORMAT).lower() + download_format = ZSpotify.CONFIG.get_download_format().lower() file_codec = CODEC_MAP.get(download_format, 'copy') if file_codec != 'copy': - bitrate = ZSpotify.get_config(BITRATE) + bitrate = ZSpotify.CONFIG.get_bitrate() if not bitrate: if ZSpotify.DOWNLOAD_QUALITY == AudioQuality.VERY_HIGH: bitrate = '320k' diff --git a/zspotify/utils.py b/zspotify/utils.py index 6c67c6b9..c7da4c39 100644 --- a/zspotify/utils.py +++ b/zspotify/utils.py @@ -1,3 +1,4 @@ +import datetime import os import platform import re @@ -11,6 +12,8 @@ import requests from const import ARTIST, TRACKTITLE, ALBUM, YEAR, DISCNUMBER, TRACKNUMBER, ARTWORK, \ WINDOWS_SYSTEM, ALBUMARTIST +from zspotify import ZSpotify + class MusicFormat(str, Enum): MP3 = 'mp3', @@ -27,29 +30,29 @@ def create_download_directory(download_path: str) -> None: with open(hidden_file_path, 'w', encoding='utf-8') as f: pass -def get_previously_downloaded(song_id: str, archive_directory: str) -> List[str]: +def get_previously_downloaded() -> List[str]: """ Returns list of all time downloaded songs """ ids = [] - archive_path = os.path.join(archive_directory, '.song_archive') + archive_path = os.path.join(os.path.dirname(__file__), ZSpotify.CONFIG.get_root_path(), ZSpotify.CONFIG.get_song_archive()) if os.path.exists(archive_path): with open(archive_path, 'r', encoding='utf-8') as f: - ids = [line.strip() for line in f.readlines()] + ids = [line.strip().split('\t')[0] for line in f.readlines()] return ids -def add_to_archive(song_id: str, archive_directory: str) -> None: +def add_to_archive(song_id: str, filename: str, author_name: str, song_name: str) -> None: """ Adds song id to all time installed songs archive """ - archive_path = os.path.join(archive_directory, '.song_archive') + archive_path = os.path.join(os.path.dirname(__file__), ZSpotify.CONFIG.get_root_path(), ZSpotify.CONFIG.get_song_archive()) if os.path.exists(archive_path): - with open(archive_path, 'a', encoding='utf-8') as f: - f.write(f'{song_id}\n') + with open(archive_path, 'a', encoding='utf-8') as file: + file.write(f'{song_id}\t{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\t{author_name}\t{song_name}\t{filename}\n') else: - with open(archive_path, 'w', encoding='utf-8') as f: - f.write(f'{song_id}\n') + with open(archive_path, 'w', encoding='utf-8') as file: + file.write(f'{song_id}\t{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\t{author_name}\t{song_name}\t{filename}\n') def get_directory_song_ids(download_path: str) -> List[str]: """ Gets song ids of songs in directory """ @@ -59,18 +62,18 @@ def get_directory_song_ids(download_path: str) -> List[str]: hidden_file_path = os.path.join(download_path, '.song_ids') if os.path.isfile(hidden_file_path): with open(hidden_file_path, 'r', encoding='utf-8') as file: - song_ids.extend([line.strip() for line in file.readlines()]) + song_ids.extend([line.strip().split('\t')[0] for line in file.readlines()]) return song_ids -def add_to_directory_song_ids(download_path: str, song_id: str) -> None: +def add_to_directory_song_ids(download_path: str, song_id: str, filename: str, author_name: str, song_name: str) -> None: """ Appends song_id to .song_ids file in directory """ hidden_file_path = os.path.join(download_path, '.song_ids') # not checking if file exists because we need an exception # to be raised if something is wrong with open(hidden_file_path, 'a', encoding='utf-8') as file: - file.write(f'{song_id}\n') + file.write(f'{song_id}\t{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\t{author_name}\t{song_name}\t{filename}\n') def get_downloaded_song_duration(filename: str) -> float: """ Returns the downloaded file's duration in seconds """ @@ -83,12 +86,6 @@ def get_downloaded_song_duration(filename: str) -> float: return duration -def wait(seconds: int = 3) -> None: - """ Pause for a set number of seconds """ - for second in range(seconds)[::-1]: - print(f'\rWait for {second + 1} second(s)...', end='') - time.sleep(1) - def split_input(selection) -> List[str]: """ Returns a list of inputted strings """ @@ -103,15 +100,15 @@ def split_input(selection) -> List[str]: return inputs -def splash() -> None: +def splash() -> str: """ Displays splash screen """ - print(""" + return """ ███████ ███████ ██████ ██████ ████████ ██ ███████ ██ ██ ███ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ███ ███████ ██████ ██ ██ ██ ██ █████ ████ ███ ██ ██ ██ ██ ██ ██ ██ ██ ███████ ███████ ██ ██████ ██ ██ ██ ██ - """) + """ def clear() -> None: @@ -253,4 +250,4 @@ def fix_filename(name): >>> all('_' == fix_filename(chr(i)) for i in list(range(32))) True """ - return re.sub(r'[/\\:|<>"?*\0-\x1f]|^(AUX|COM[1-9]|CON|LPT[1-9]|NUL|PRN)(?![^.])|^\s|[\s.]$', "_", name, flags=re.IGNORECASE) + return re.sub(r'[/\\:|<>"?*\0-\x1f]|^(AUX|COM[1-9]|CON|LPT[1-9]|NUL|PRN)(?![^.])|^\s|[\s.]$', "_", str(name), flags=re.IGNORECASE) diff --git a/zspotify/zspotify.py b/zspotify/zspotify.py index a3f5e600..51ef6c64 100644 --- a/zspotify/zspotify.py +++ b/zspotify/zspotify.py @@ -16,28 +16,30 @@ import requests from librespot.audio.decoders import VorbisOnlyAudioQuality from librespot.core import Session -from const import CREDENTIALS_JSON, TYPE, \ - PREMIUM, USER_READ_EMAIL, AUTHORIZATION, OFFSET, LIMIT, CONFIG_FILE_PATH, FORCE_PREMIUM, \ - PLAYLIST_READ_PRIVATE, USER_LIBRARY_READ, CONFIG_DEFAULT_SETTINGS -from utils import MusicFormat +from const import TYPE, \ + PREMIUM, USER_READ_EMAIL, AUTHORIZATION, OFFSET, LIMIT, \ + PLAYLIST_READ_PRIVATE, USER_LIBRARY_READ +from config import Config class ZSpotify: SESSION: Session = None DOWNLOAD_QUALITY = None - CONFIG = {} + CONFIG: Config = Config() - def __init__(self): - ZSpotify.load_config() + def __init__(self, args): + ZSpotify.CONFIG.load(args) ZSpotify.login() @classmethod def login(cls): """ Authenticates with Spotify and saves credentials to a file """ - if os.path.isfile(CREDENTIALS_JSON): + cred_location = os.path.join(os.getcwd(), Config.get_credentials_location()) + + if os.path.isfile(cred_location): try: - cls.SESSION = Session.Builder().stored_file().create() + cls.SESSION = Session.Builder().stored_file(cred_location).create() return except RuntimeError: pass @@ -52,22 +54,6 @@ class ZSpotify: except RuntimeError: pass - @classmethod - def load_config(cls) -> None: - app_dir = os.path.dirname(__file__) - true_config_file_path = os.path.join(app_dir, CONFIG_FILE_PATH) - if not os.path.exists(true_config_file_path): - with open(true_config_file_path, 'w', encoding='utf-8') as config_file: - json.dump(CONFIG_DEFAULT_SETTINGS, config_file, indent=4) - cls.CONFIG = CONFIG_DEFAULT_SETTINGS - else: - with open(true_config_file_path, encoding='utf-8') as config_file: - cls.CONFIG = json.load(config_file) - - @classmethod - def get_config(cls, key) -> Any: - return cls.CONFIG.get(key) - @classmethod def get_content_stream(cls, content_id, quality): return cls.SESSION.content_feeder().load(content_id, VorbisOnlyAudioQuality(quality), False, None) @@ -80,14 +66,14 @@ class ZSpotify: def get_auth_header(cls): return { 'Authorization': f'Bearer {cls.__get_auth_token()}', - 'Accept-Language': f'{cls.CONFIG.get("LANGUAGE")}' + 'Accept-Language': f'{cls.CONFIG.get_language()}' } @classmethod def get_auth_header_and_params(cls, limit, offset): return { 'Authorization': f'Bearer {cls.__get_auth_token()}', - 'Accept-Language': f'{cls.CONFIG.get("LANGUAGE")}' + 'Accept-Language': f'{cls.CONFIG.get_language()}' }, {LIMIT: limit, OFFSET: offset} @classmethod @@ -104,4 +90,4 @@ class ZSpotify: @classmethod def check_premium(cls) -> bool: """ If user has spotify premium return true """ - return (cls.SESSION.get_user_attribute(TYPE) == PREMIUM) or cls.get_config(FORCE_PREMIUM) + return (cls.SESSION.get_user_attribute(TYPE) == PREMIUM) or cls.CONFIG.get_force_premium()