Merge remote-tracking branch 'upstream/main'

This commit is contained in:
dabreadman 2021-11-21 18:06:54 +00:00
commit 329d0baedb
11 changed files with 367 additions and 192 deletions

View File

@ -1,8 +1,7 @@
import argparse import argparse
from app import client from app import client
from config import CONFIG_VALUES
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser(prog='zspotify', parser = argparse.ArgumentParser(prog='zspotify',
@ -10,6 +9,9 @@ if __name__ == '__main__':
parser.add_argument('-ns', '--no-splash', parser.add_argument('-ns', '--no-splash',
action='store_true', action='store_true',
help='Suppress the splash screen when loading.') help='Suppress the splash screen when loading.')
parser.add_argument('--config-location',
type=str,
help='Specify the zs_config.json location')
group = parser.add_mutually_exclusive_group(required=True) group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('urls', group.add_argument('urls',
type=str, type=str,
@ -32,6 +34,12 @@ if __name__ == '__main__':
type=str, type=str,
help='Downloads tracks, playlists and albums from the URLs written in the file passed.') help='Downloads tracks, playlists and albums from the URLs written in the file passed.')
for configkey in CONFIG_VALUES:
parser.add_argument(CONFIG_VALUES[configkey]['arg'],
type=str,
default=None,
help='Specify the value of the ['+configkey+'] config value')
parser.set_defaults(func=client) parser.set_defaults(func=client)
args = parser.parse_args() args = parser.parse_args()

View File

@ -1,6 +1,5 @@
from tqdm import tqdm
from const import ITEMS, ARTISTS, NAME, ID from const import ITEMS, ARTISTS, NAME, ID
from termoutput import Printer
from track import download_track from track import download_track
from utils import fix_filename from utils import fix_filename
from zspotify import ZSpotify from zspotify import ZSpotify
@ -47,12 +46,9 @@ def get_artist_albums(artist_id):
def download_album(album): def download_album(album):
""" Downloads songs from an album """ """ Downloads songs from an album """
artist, album_name = get_album_name(album) artist, album_name = get_album_name(album)
artist_fixed = fix_filename(artist)
album_name_fixed = fix_filename(album_name)
tracks = get_album_tracks(album) tracks = get_album_tracks(album)
for n, track in tqdm(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)): for n, track in Printer.progress(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)):
download_track(track[ID], f'{artist_fixed}/{album_name_fixed}', download_track('album', track[ID], extra_keys={'album_num': str(n).zfill(2), 'artist': artist, 'album': album_name, 'album_id': album}, disable_progressbar=True)
prefix=True, prefix_value=str(n), disable_progressbar=True)
def download_artist_albums(artist): def download_artist_albums(artist):

View File

@ -7,6 +7,7 @@ from const import TRACK, NAME, ID, ARTIST, ARTISTS, ITEMS, TRACKS, EXPLICIT, ALB
OWNER, PLAYLIST, PLAYLISTS, DISPLAY_NAME OWNER, PLAYLIST, PLAYLISTS, DISPLAY_NAME
from playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist, download_playlist from playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist, download_playlist
from podcast import download_episode, get_show_episodes from podcast import download_episode, get_show_episodes
from termoutput import Printer, PrintChannel
from track import download_track, get_saved_tracks from track import download_track, get_saved_tracks
from utils import fix_filename, splash, split_input, regex_input_for_urls from utils import fix_filename, splash, split_input, regex_input_for_urls
from zspotify import ZSpotify from zspotify import ZSpotify
@ -16,18 +17,15 @@ SEARCH_URL = 'https://api.spotify.com/v1/search'
def client(args) -> None: def client(args) -> None:
""" Connects to spotify to perform query's and get songs to download """ """ Connects to spotify to perform query's and get songs to download """
ZSpotify() ZSpotify(args)
if not args.no_splash: Printer.print(PrintChannel.SPLASH, splash())
splash()
if ZSpotify.check_premium(): if ZSpotify.check_premium():
if not args.no_splash: Printer.print(PrintChannel.SPLASH, '[ DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ]\n\n')
print('[ DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ]\n\n')
ZSpotify.DOWNLOAD_QUALITY = AudioQuality.VERY_HIGH ZSpotify.DOWNLOAD_QUALITY = AudioQuality.VERY_HIGH
else: else:
if not args.no_splash: Printer.print(PrintChannel.SPLASH, '[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n')
print('[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n')
ZSpotify.DOWNLOAD_QUALITY = AudioQuality.HIGH ZSpotify.DOWNLOAD_QUALITY = AudioQuality.HIGH
if args.download: if args.download:
@ -40,7 +38,7 @@ def client(args) -> None:
download_from_urls(urls) download_from_urls(urls)
else: else:
print(f'File {filename} not found.\n') Printer.print(PrintChannel.ERRORS, f'File {filename} not found.\n')
if args.urls: if args.urls:
download_from_urls(args.urls) download_from_urls(args.urls)
@ -51,11 +49,9 @@ def client(args) -> None:
if args.liked_songs: if args.liked_songs:
for song in get_saved_tracks(): for song in get_saved_tracks():
if not song[TRACK][NAME]: if not song[TRACK][NAME]:
print( Printer.print(PrintChannel.ERRORS, '### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###' + "\n")
'### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###')
else: else:
download_track(song[TRACK][ID], 'Liked Songs/') download_track('liked', song[TRACK][ID])
print('\n')
if args.search_spotify: if args.search_spotify:
search_text = '' search_text = ''
@ -75,7 +71,7 @@ def download_from_urls(urls: list[str]) -> bool:
if track_id is not None: if track_id is not None:
download = True download = True
download_track(track_id) download_track('single', track_id)
elif artist_id is not None: elif artist_id is not None:
download = True download = True
download_artist_albums(artist_id) download_artist_albums(artist_id)
@ -87,9 +83,7 @@ def download_from_urls(urls: list[str]) -> bool:
playlist_songs = get_playlist_songs(playlist_id) playlist_songs = get_playlist_songs(playlist_id)
name, _ = get_playlist_info(playlist_id) name, _ = get_playlist_info(playlist_id)
for song in playlist_songs: for song in playlist_songs:
download_track(song[TRACK][ID], download_track('playlist', song[TRACK][ID], extra_keys={'playlist': name})
fix_filename(name) + '/')
print('\n')
elif episode_id is not None: elif episode_id is not None:
download = True download = True
download_episode(episode_id) download_episode(episode_id)
@ -273,7 +267,7 @@ def search(search_term):
print_pos = dics.index(dic) + 1 print_pos = dics.index(dic) + 1
if print_pos == position: if print_pos == position:
if dic['type'] == TRACK: if dic['type'] == TRACK:
download_track(dic[ID]) download_track('single', dic[ID])
elif dic['type'] == ALBUM: elif dic['type'] == ALBUM:
download_album(dic[ID]) download_album(dic[ID])
elif dic['type'] == ARTIST: elif dic['type'] == ARTIST:

220
zspotify/config.py Normal file
View File

@ -0,0 +1,220 @@
import json
import os
import sys
from typing import Any
from enum import Enum
CONFIG_FILE_PATH = '../zs_config.json'
ROOT_PATH = 'ROOT_PATH'
ROOT_PODCAST_PATH = 'ROOT_PODCAST_PATH'
SKIP_EXISTING_FILES = 'SKIP_EXISTING_FILES'
SKIP_PREVIOUSLY_DOWNLOADED = 'SKIP_PREVIOUSLY_DOWNLOADED'
DOWNLOAD_FORMAT = 'DOWNLOAD_FORMAT'
FORCE_PREMIUM = 'FORCE_PREMIUM'
ANTI_BAN_WAIT_TIME = 'ANTI_BAN_WAIT_TIME'
OVERRIDE_AUTO_WAIT = 'OVERRIDE_AUTO_WAIT'
CHUNK_SIZE = 'CHUNK_SIZE'
SPLIT_ALBUM_DISCS = 'SPLIT_ALBUM_DISCS'
DOWNLOAD_REAL_TIME = 'DOWNLOAD_REAL_TIME'
LANGUAGE = 'LANGUAGE'
BITRATE = 'BITRATE'
SONG_ARCHIVE = 'SONG_ARCHIVE'
CREDENTIALS_LOCATION = 'CREDENTIALS_LOCATION'
OUTPUT = 'OUTPUT'
PRINT_SPLASH = 'PRINT_SPLASH'
PRINT_SKIPS = 'PRINT_SKIPS'
PRINT_DOWNLOAD_PROGRESS = 'PRINT_DOWNLOAD_PROGRESS'
PRINT_ERRORS = 'PRINT_ERRORS'
PRINT_DOWNLOADS = 'PRINT_DOWNLOADS'
CONFIG_VALUES = {
ROOT_PATH: { 'default': '../ZSpotify Music/', 'type': str, 'arg': '--root-path' },
ROOT_PODCAST_PATH: { 'default': '../ZSpotify Podcasts/', 'type': str, 'arg': '--root-podcast-path' },
SKIP_EXISTING_FILES: { 'default': 'True', 'type': bool, 'arg': '--skip-existing-files' },
SKIP_PREVIOUSLY_DOWNLOADED: { 'default': 'False', 'type': bool, 'arg': '--skip-previously-downloaded' },
DOWNLOAD_FORMAT: { 'default': 'ogg', 'type': str, 'arg': '--download-format' },
FORCE_PREMIUM: { 'default': 'False', 'type': bool, 'arg': '--force-premium' },
ANTI_BAN_WAIT_TIME: { 'default': '1', 'type': int, 'arg': '--anti-ban-wait-time' },
OVERRIDE_AUTO_WAIT: { 'default': 'False', 'type': bool, 'arg': '--override-auto-wait' },
CHUNK_SIZE: { 'default': '50000', 'type': int, 'arg': '--chunk-size' },
SPLIT_ALBUM_DISCS: { 'default': 'False', 'type': bool, 'arg': '--split-album-discs' },
DOWNLOAD_REAL_TIME: { 'default': 'False', 'type': bool, 'arg': '--download-real-time' },
LANGUAGE: { 'default': 'en', 'type': str, 'arg': '--language' },
BITRATE: { 'default': '', 'type': str, 'arg': '--bitrate' },
SONG_ARCHIVE: { 'default': '.song_archive', 'type': str, 'arg': '--song-archive' },
CREDENTIALS_LOCATION: { 'default': 'credentials.json', 'type': str, 'arg': '--credentials-location' },
OUTPUT: { 'default': '', 'type': str, 'arg': '--output' },
PRINT_SPLASH: { 'default': 'True', 'type': bool, 'arg': '--print-splash' },
PRINT_SKIPS: { 'default': 'True', 'type': bool, 'arg': '--print-skips' },
PRINT_DOWNLOAD_PROGRESS: { 'default': 'True', 'type': bool, 'arg': '--print-download-progress' },
PRINT_ERRORS: { 'default': 'True', 'type': bool, 'arg': '--print-errors' },
PRINT_DOWNLOADS: { 'default': 'False', 'type': bool, 'arg': '--print-downloads' },
}
OUTPUT_DEFAULT_PLAYLIST = '{playlist}/{artist} - {song_name}.{ext}'
OUTPUT_DEFAULT_PLAYLIST_EXT = '{playlist}/{playlist_num} - {artist} - {song_name}.{ext}'
OUTPUT_DEFAULT_LIKED_SONGS = 'Liked Songs/{artist} - {song_name}.{ext}'
OUTPUT_DEFAULT_SINGLE = '{artist} - {song_name}.{ext}'
OUTPUT_DEFAULT_ALBUM = '{artist}/{album}/{album_num} - {artist} - {song_name}.{ext}'
class Config:
Values = {}
@classmethod
def load(cls, args) -> None:
app_dir = os.path.dirname(__file__)
config_fp = CONFIG_FILE_PATH
if args.config_location:
config_fp = args.config_location
true_config_file_path = os.path.join(app_dir, config_fp)
# Load config from zs_config.json
if not os.path.exists(true_config_file_path):
with open(true_config_file_path, 'w', encoding='utf-8') as config_file:
json.dump(cls.get_default_json(), config_file, indent=4)
cls.Values = cls.get_default_json()
else:
with open(true_config_file_path, encoding='utf-8') as config_file:
jsonvalues = json.load(config_file)
cls.Values = {}
for key in CONFIG_VALUES:
if key in jsonvalues:
cls.Values[key] = cls.parse_arg_value(key, jsonvalues[key])
# Add default values for missing keys
for key in CONFIG_VALUES:
if key not in cls.Values:
cls.Values[key] = cls.parse_arg_value(key, CONFIG_VALUES[key]['default'])
# Override config from commandline arguments
for key in CONFIG_VALUES:
if key.lower() in vars(args) and vars(args)[key.lower()] is not None:
cls.Values[key] = cls.parse_arg_value(key, vars(args)[key.lower()])
if args.no_splash:
cls.Values[PRINT_SPLASH] = False
@classmethod
def get_default_json(cls) -> Any:
r = {}
for key in CONFIG_VALUES:
r[key] = CONFIG_VALUES[key]['default']
return r
@classmethod
def parse_arg_value(cls, key: str, value: Any) -> Any:
if type(value) == CONFIG_VALUES[key]['type']:
return value
if CONFIG_VALUES[key]['type'] == str:
return str(value)
if CONFIG_VALUES[key]['type'] == int:
return int(value)
if CONFIG_VALUES[key]['type'] == bool:
if str(value).lower() in ['yes', 'true', '1']:
return True
if str(value).lower() in ['no', 'false', '0']:
return False
raise ValueError("Not a boolean: " + value)
raise ValueError("Unknown Type: " + value)
@classmethod
def get(cls, key: str) -> Any:
return cls.Values.get(key)
@classmethod
def get_root_path(cls) -> str:
return cls.get(ROOT_PATH)
@classmethod
def get_root_podcast_path(cls) -> str:
return cls.get(ROOT_PODCAST_PATH)
@classmethod
def get_skip_existing_files(cls) -> bool:
return cls.get(SKIP_EXISTING_FILES)
@classmethod
def get_skip_previously_downloaded(cls) -> bool:
return cls.get(SKIP_PREVIOUSLY_DOWNLOADED)
@classmethod
def get_split_album_discs(cls) -> bool:
return cls.get(SPLIT_ALBUM_DISCS)
@classmethod
def get_chunk_size(cls) -> int():
return cls.get(CHUNK_SIZE)
@classmethod
def get_override_auto_wait(cls) -> bool:
return cls.get(OVERRIDE_AUTO_WAIT)
@classmethod
def get_force_premium(cls) -> bool:
return cls.get(FORCE_PREMIUM)
@classmethod
def get_download_format(cls) -> str:
return cls.get(DOWNLOAD_FORMAT)
@classmethod
def get_anti_ban_wait_time(cls) -> int:
return cls.get(ANTI_BAN_WAIT_TIME)
@classmethod
def get_language(cls) -> str:
return cls.get(LANGUAGE)
@classmethod
def get_download_real_time(cls) -> bool:
return cls.get(DOWNLOAD_REAL_TIME)
@classmethod
def get_bitrate(cls) -> str:
return cls.get(BITRATE)
@classmethod
def get_song_archive(cls) -> str:
return cls.get(SONG_ARCHIVE)
@classmethod
def get_credentials_location(cls) -> str:
return cls.get(CREDENTIALS_LOCATION)
@classmethod
def get_output(cls, mode: str) -> str:
v = cls.get(OUTPUT)
if v:
return v
if mode == 'playlist':
if cls.get_split_album_discs():
split = os.path.split(OUTPUT_DEFAULT_PLAYLIST)
return os.path.join(split[0], 'Disc {disc_number}', split[0])
return OUTPUT_DEFAULT_PLAYLIST
if mode == 'extplaylist':
if cls.get_split_album_discs():
split = os.path.split(OUTPUT_DEFAULT_PLAYLIST_EXT)
return os.path.join(split[0], 'Disc {disc_number}', split[0])
return OUTPUT_DEFAULT_PLAYLIST_EXT
if mode == 'liked':
if cls.get_split_album_discs():
split = os.path.split(OUTPUT_DEFAULT_LIKED_SONGS)
return os.path.join(split[0], 'Disc {disc_number}', split[0])
return OUTPUT_DEFAULT_LIKED_SONGS
if mode == 'single':
if cls.get_split_album_discs():
split = os.path.split(OUTPUT_DEFAULT_SINGLE)
return os.path.join(split[0], 'Disc {disc_number}', split[0])
return OUTPUT_DEFAULT_SINGLE
if mode == 'album':
if cls.get_split_album_discs():
split = os.path.split(OUTPUT_DEFAULT_ALBUM)
return os.path.join(split[0], 'Disc {disc_number}', split[0])
return OUTPUT_DEFAULT_ALBUM
raise ValueError()

View File

@ -80,34 +80,6 @@ USER_LIBRARY_READ = 'user-library-read'
WINDOWS_SYSTEM = 'Windows' WINDOWS_SYSTEM = 'Windows'
CREDENTIALS_JSON = 'credentials.json'
CONFIG_FILE_PATH = '../zs_config.json'
ROOT_PATH = 'ROOT_PATH'
ROOT_PODCAST_PATH = 'ROOT_PODCAST_PATH'
SKIP_EXISTING_FILES = 'SKIP_EXISTING_FILES'
SKIP_PREVIOUSLY_DOWNLOADED = 'SKIP_PREVIOUSLY_DOWNLOADED'
DOWNLOAD_FORMAT = 'DOWNLOAD_FORMAT'
FORCE_PREMIUM = 'FORCE_PREMIUM'
ANTI_BAN_WAIT_TIME = 'ANTI_BAN_WAIT_TIME'
OVERRIDE_AUTO_WAIT = 'OVERRIDE_AUTO_WAIT'
CHUNK_SIZE = 'CHUNK_SIZE'
SPLIT_ALBUM_DISCS = 'SPLIT_ALBUM_DISCS'
DOWNLOAD_REAL_TIME = 'DOWNLOAD_REAL_TIME'
BITRATE = 'BITRATE'
CODEC_MAP = { CODEC_MAP = {
'aac': 'aac', 'aac': 'aac',
'fdk_aac': 'libfdk_aac', 'fdk_aac': 'libfdk_aac',
@ -127,18 +99,3 @@ EXT_MAP = {
'opus': 'ogg', 'opus': 'ogg',
'vorbis': 'ogg', 'vorbis': 'ogg',
} }
CONFIG_DEFAULT_SETTINGS = {
'ROOT_PATH': '../ZSpotify Music/',
'ROOT_PODCAST_PATH': '../ZSpotify Podcasts/',
'SKIP_EXISTING_FILES': True,
'SKIP_PREVIOUSLY_DOWNLOADED': False,
'DOWNLOAD_FORMAT': 'ogg',
'FORCE_PREMIUM': False,
'ANTI_BAN_WAIT_TIME': 1,
'OVERRIDE_AUTO_WAIT': False,
'CHUNK_SIZE': 50000,
'SPLIT_ALBUM_DISCS': False,
'DOWNLOAD_REAL_TIME': False,
'LANGUAGE': 'en'
}

View File

@ -1,6 +1,5 @@
from tqdm import tqdm
from const import ITEMS, ID, TRACK, NAME from const import ITEMS, ID, TRACK, NAME
from termoutput import Printer
from track import download_track from track import download_track
from utils import fix_filename, split_input from utils import fix_filename, split_input
from zspotify import ZSpotify from zspotify import ZSpotify
@ -51,11 +50,10 @@ def download_playlist(playlist):
"""Downloads all the songs from a playlist""" """Downloads all the songs from a playlist"""
playlist_songs = [song for song in get_playlist_songs(playlist[ID]) if song[TRACK][ID]] playlist_songs = [song for song in get_playlist_songs(playlist[ID]) if song[TRACK][ID]]
p_bar = tqdm(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True) p_bar = Printer.progress(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True)
enum = 1 enum = 1
for song in p_bar: for song in p_bar:
download_track(song[TRACK][ID], fix_filename(playlist[NAME].strip()) + '/', download_track('extplaylist', song[TRACK][ID], extra_keys={'playlist': playlist[NAME], 'playlist_num': str(enum).zfill(2)}, disable_progressbar=True)
prefix=True, prefix_value=str(enum) ,disable_progressbar=True)
p_bar.set_description(song[TRACK][NAME]) p_bar.set_description(song[TRACK][NAME])
enum += 1 enum += 1

View File

@ -3,10 +3,9 @@ from typing import Optional, Tuple
from librespot.audio.decoders import VorbisOnlyAudioQuality from librespot.audio.decoders import VorbisOnlyAudioQuality
from librespot.metadata import EpisodeId from librespot.metadata import EpisodeId
from tqdm import tqdm
from const import (CHUNK_SIZE, ERROR, ID, ITEMS, NAME, ROOT_PODCAST_PATH, SHOW, from const import (ERROR, ID, ITEMS, NAME, SHOW)
SKIP_EXISTING_FILES) from termoutput import PrintChannel, Printer
from utils import create_download_directory, fix_filename from utils import create_download_directory, fix_filename
from zspotify import ZSpotify from zspotify import ZSpotify
@ -71,7 +70,7 @@ def download_episode(episode_id) -> None:
extra_paths = podcast_name + '/' extra_paths = podcast_name + '/'
if podcast_name is None: if podcast_name is None:
print('### SKIPPING: (EPISODE NOT FOUND) ###') Printer.print(PrintChannel.ERRORS, '### SKIPPING: (EPISODE NOT FOUND) ###')
else: else:
filename = podcast_name + ' - ' + episode_name filename = podcast_name + ' - ' + episode_name
@ -80,7 +79,7 @@ def download_episode(episode_id) -> None:
download_directory = os.path.join( download_directory = os.path.join(
os.path.dirname(__file__), os.path.dirname(__file__),
ZSpotify.get_config(ROOT_PODCAST_PATH), ZSpotify.CONFIG.get_root_podcast_path(),
extra_paths, extra_paths,
) )
download_directory = os.path.realpath(download_directory) download_directory = os.path.realpath(download_directory)
@ -97,27 +96,21 @@ def download_episode(episode_id) -> None:
if ( if (
os.path.isfile(filepath) os.path.isfile(filepath)
and os.path.getsize(filepath) == total_size and os.path.getsize(filepath) == total_size
and ZSpotify.get_config(SKIP_EXISTING_FILES) and ZSpotify.CONFIG.get_skip_existing_files()
): ):
print( Printer.print(PrintChannel.SKIPS, "\n### SKIPPING: " + podcast_name + " - " + episode_name + " (EPISODE ALREADY EXISTS) ###")
"\n### SKIPPING:",
podcast_name,
"-",
episode_name,
"(EPISODE ALREADY EXISTS) ###",
)
return return
with open(filepath, 'wb') as file, tqdm( with open(filepath, 'wb') as file, Printer.progress(
desc=filename, desc=filename,
total=total_size, total=total_size,
unit='B', unit='B',
unit_scale=True, unit_scale=True,
unit_divisor=1024 unit_divisor=1024
) as bar: ) as bar:
for _ in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1): for _ in range(int(total_size / ZSpotify.CONFIG.get_chunk_size()) + 1):
bar.update(file.write( bar.update(file.write(
stream.input_stream.stream().read(ZSpotify.get_config(CHUNK_SIZE)))) stream.input_stream.stream().read(ZSpotify.CONFIG.get_chunk_size())))
else: else:
filepath = os.path.join(download_directory, f"{filename}.mp3") filepath = os.path.join(download_directory, f"{filename}.mp3")
download_podcast_directly(direct_download_url, filepath) download_podcast_directly(direct_download_url, filepath)

26
zspotify/termoutput.py Normal file
View File

@ -0,0 +1,26 @@
from enum import Enum
from tqdm import tqdm
from config import PRINT_SPLASH, PRINT_SKIPS, PRINT_DOWNLOAD_PROGRESS, PRINT_ERRORS, PRINT_DOWNLOADS
from zspotify import ZSpotify
class PrintChannel(Enum):
SPLASH = PRINT_SPLASH
SKIPS = PRINT_SKIPS
DOWNLOAD_PROGRESS = PRINT_DOWNLOAD_PROGRESS
ERRORS = PRINT_ERRORS
DOWNLOADS = PRINT_DOWNLOADS
class Printer:
@staticmethod
def print(channel: PrintChannel, msg: str) -> None:
if ZSpotify.CONFIG.get(channel.value):
print(msg)
@staticmethod
def progress(iterable=None, desc=None, total=None, unit='it', disable=False, unit_scale=False, unit_divisor=1000):
if not ZSpotify.CONFIG.get(PrintChannel.DOWNLOAD_PROGRESS.value):
disable = True
return tqdm(iterable=iterable, desc=desc, total=total, disable=disable, unit=unit, unit_scale=unit_scale, unit_divisor=unit_divisor)

View File

@ -7,12 +7,10 @@ from librespot.audio.decoders import AudioQuality
from librespot.metadata import TrackId from librespot.metadata import TrackId
from ffmpy import FFmpeg from ffmpy import FFmpeg
from pydub import AudioSegment from pydub import AudioSegment
from tqdm import tqdm
from const import TRACKS, ALBUM, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \ from const import TRACKS, ALBUM, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, SPLIT_ALBUM_DISCS, ROOT_PATH, DOWNLOAD_FORMAT, \ RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS
CHUNK_SIZE, SKIP_EXISTING_FILES, ANTI_BAN_WAIT_TIME, OVERRIDE_AUTO_WAIT, BITRATE, CODEC_MAP, EXT_MAP, DOWNLOAD_REAL_TIME, \ from termoutput import Printer, PrintChannel
SKIP_PREVIOUSLY_DOWNLOADED, DURATION_MS
from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \ from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \
get_directory_song_ids, add_to_directory_song_ids, get_previously_downloaded, add_to_archive get_directory_song_ids, add_to_directory_song_ids, get_previously_downloaded, add_to_archive
from zspotify import ZSpotify from zspotify import ZSpotify
@ -71,58 +69,60 @@ def get_song_duration(song_id: str) -> float:
return duration return duration
# noinspection PyBroadException # noinspection PyBroadException
def download_track(track_id: str, extra_paths='', prefix=False, prefix_value='', disable_progressbar=False) -> None: def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=False) -> None:
""" Downloads raw song audio from Spotify """ """ Downloads raw song audio from Spotify """
try: try:
output_template = ZSpotify.CONFIG.get_output(mode)
(artists, album_name, name, image_url, release_year, disc_number, (artists, album_name, name, image_url, release_year, disc_number,
track_number, scraped_song_id, is_playable, duration_ms) = get_song_info(track_id) track_number, scraped_song_id, is_playable, duration_ms) = get_song_info(track_id)
if ZSpotify.get_config(SPLIT_ALBUM_DISCS):
download_directory = os.path.join(os.path.dirname(
__file__), ZSpotify.get_config(ROOT_PATH), extra_paths, f'Disc {disc_number}')
else:
download_directory = os.path.join(os.path.dirname(
__file__), ZSpotify.get_config(ROOT_PATH), extra_paths)
song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name) song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name)
if prefix:
song_name = f'{prefix_value.zfill(2)} - {song_name}' if prefix_value.isdigit(
) else f'{prefix_value} - {song_name}'
filename = os.path.join( for k in extra_keys:
download_directory, f'{song_name}.{EXT_MAP.get(ZSpotify.get_config(DOWNLOAD_FORMAT).lower())}') output_template = output_template.replace("{"+k+"}", fix_filename(extra_keys[k]))
output_template = output_template.replace("{artist}", fix_filename(artists[0]))
output_template = output_template.replace("{album}", fix_filename(album_name))
output_template = output_template.replace("{song_name}", fix_filename(name))
output_template = output_template.replace("{release_year}", fix_filename(release_year))
output_template = output_template.replace("{disc_number}", fix_filename(disc_number))
output_template = output_template.replace("{track_number}", fix_filename(track_number))
output_template = output_template.replace("{id}", fix_filename(scraped_song_id))
output_template = output_template.replace("{track_id}", fix_filename(track_id))
output_template = output_template.replace("{ext}", EXT_MAP.get(ZSpotify.CONFIG.get_download_format().lower()))
filename = os.path.join(os.path.dirname(__file__), ZSpotify.CONFIG.get_root_path(), output_template)
filedir = os.path.dirname(filename)
archive_directory = os.path.join(os.path.dirname(__file__), ZSpotify.get_config(ROOT_PATH))
check_name = os.path.isfile(filename) and os.path.getsize(filename) check_name = os.path.isfile(filename) and os.path.getsize(filename)
check_id = scraped_song_id in get_directory_song_ids(download_directory) check_id = scraped_song_id in get_directory_song_ids(filedir)
check_all_time = scraped_song_id in get_previously_downloaded(scraped_song_id, archive_directory) check_all_time = scraped_song_id in get_previously_downloaded()
# a song with the same name is installed # a song with the same name is installed
if not check_id and check_name: if not check_id and check_name:
c = len([file for file in os.listdir(download_directory) c = len([file for file in os.listdir(filedir) if re.search(f'^{filename}_', str(file))]) + 1
if re.search(f'^{song_name}_', file)]) + 1
filename = os.path.join( fname = os.path.splitext(os.path.basename(filename))[0]
download_directory, f'{song_name}_{c}.{EXT_MAP.get(ZSpotify.get_config(DOWNLOAD_FORMAT))}') ext = os.path.splitext(os.path.basename(filename))[1]
filename = os.path.join(filedir, f'{fname}_{c}{ext}')
except Exception as e: except Exception as e:
print('### SKIPPING SONG - FAILED TO QUERY METADATA ###') Printer.print(PrintChannel.ERRORS, '### SKIPPING SONG - FAILED TO QUERY METADATA ###')
print(e) Printer.print(PrintChannel.ERRORS, str(e) + "\n")
else: else:
try: try:
if not is_playable: if not is_playable:
print('\n### SKIPPING:', song_name, Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG IS UNAVAILABLE) ###' + "\n")
'(SONG IS UNAVAILABLE) ###')
else: else:
if check_id and check_name and ZSpotify.get_config(SKIP_EXISTING_FILES): if check_id and check_name and ZSpotify.CONFIG.get_skip_existing_files():
print('\n### SKIPPING:', song_name, Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY EXISTS) ###' + "\n")
'(SONG ALREADY EXISTS) ###')
elif check_all_time and ZSpotify.get_config(SKIP_PREVIOUSLY_DOWNLOADED): elif check_all_time and ZSpotify.CONFIG.get_skip_previously_downloaded():
print('\n### SKIPPING:', song_name, Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE) ###' + "\n")
'(SONG ALREADY DOWNLOADED ONCE) ###')
else: else:
if track_id != scraped_song_id: if track_id != scraped_song_id:
@ -130,10 +130,10 @@ def download_track(track_id: str, extra_paths='', prefix=False, prefix_value='',
track_id = TrackId.from_base62(track_id) track_id = TrackId.from_base62(track_id)
stream = ZSpotify.get_content_stream( stream = ZSpotify.get_content_stream(
track_id, ZSpotify.DOWNLOAD_QUALITY) track_id, ZSpotify.DOWNLOAD_QUALITY)
create_download_directory(download_directory) create_download_directory(filedir)
total_size = stream.input_stream.size total_size = stream.input_stream.size
with open(filename, 'wb') as file, tqdm( with open(filename, 'wb') as file, Printer.progress(
desc=song_name, desc=song_name,
total=total_size, total=total_size,
unit='B', unit='B',
@ -141,31 +141,31 @@ def download_track(track_id: str, extra_paths='', prefix=False, prefix_value='',
unit_divisor=1024, unit_divisor=1024,
disable=disable_progressbar disable=disable_progressbar
) as p_bar: ) as p_bar:
pause = duration_ms / ZSpotify.get_config(CHUNK_SIZE) pause = duration_ms / ZSpotify.CONFIG.get_chunk_size()
for chunk in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1): for chunk in range(int(total_size / ZSpotify.CONFIG.get_chunk_size()) + 1):
data = stream.input_stream.stream().read(ZSpotify.get_config(CHUNK_SIZE)) data = stream.input_stream.stream().read(ZSpotify.CONFIG.get_chunk_size())
p_bar.update(file.write(data)) p_bar.update(file.write(data))
if ZSpotify.get_config(DOWNLOAD_REAL_TIME): if ZSpotify.CONFIG.get_download_real_time():
time.sleep(pause) time.sleep(pause)
convert_audio_format(filename) convert_audio_format(filename)
set_audio_tags(filename, artists, name, album_name, set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number)
release_year, disc_number, track_number)
set_music_thumbnail(filename, image_url) set_music_thumbnail(filename, image_url)
Printer.print(PrintChannel.DOWNLOADS, f'### Downloaded "{song_name}" to "{os.path.relpath(filename, os.path.dirname(__file__))}" ###' + "\n")
# add song id to archive file # add song id to archive file
if ZSpotify.get_config(SKIP_PREVIOUSLY_DOWNLOADED): if ZSpotify.CONFIG.get_skip_previously_downloaded():
add_to_archive(scraped_song_id, archive_directory) add_to_archive(scraped_song_id, os.path.basename(filename), artists[0], name)
# add song id to download directory's .song_ids file # add song id to download directory's .song_ids file
if not check_id: if not check_id:
add_to_directory_song_ids(download_directory, scraped_song_id) add_to_directory_song_ids(filedir, scraped_song_id, os.path.basename(filename), artists[0], name)
if not ZSpotify.get_config(OVERRIDE_AUTO_WAIT): if not ZSpotify.CONFIG.get_anti_ban_wait_time():
time.sleep(ZSpotify.get_config(ANTI_BAN_WAIT_TIME)) time.sleep(ZSpotify.CONFIG.get_anti_ban_wait_time())
except Exception as e: except Exception as e:
print('### SKIPPING:', song_name, Printer.print(PrintChannel.ERRORS, '### SKIPPING: ' + song_name + ' (GENERAL DOWNLOAD ERROR) ###')
'(GENERAL DOWNLOAD ERROR) ###') Printer.print(PrintChannel.ERRORS, str(e) + "\n")
print(e)
if os.path.exists(filename): if os.path.exists(filename):
os.remove(filename) os.remove(filename)
@ -175,10 +175,10 @@ def convert_audio_format(filename) -> None:
temp_filename = f'{os.path.splitext(filename)[0]}.tmp' temp_filename = f'{os.path.splitext(filename)[0]}.tmp'
os.replace(filename, temp_filename) os.replace(filename, temp_filename)
download_format = ZSpotify.get_config(DOWNLOAD_FORMAT).lower() download_format = ZSpotify.CONFIG.get_download_format().lower()
file_codec = CODEC_MAP.get(download_format, 'copy') file_codec = CODEC_MAP.get(download_format, 'copy')
if file_codec != 'copy': if file_codec != 'copy':
bitrate = ZSpotify.get_config(BITRATE) bitrate = ZSpotify.CONFIG.get_bitrate()
if not bitrate: if not bitrate:
if ZSpotify.DOWNLOAD_QUALITY == AudioQuality.VERY_HIGH: if ZSpotify.DOWNLOAD_QUALITY == AudioQuality.VERY_HIGH:
bitrate = '320k' bitrate = '320k'

View File

@ -1,3 +1,4 @@
import datetime
import os import os
import platform import platform
import re import re
@ -11,6 +12,8 @@ import requests
from const import ARTIST, TRACKTITLE, ALBUM, YEAR, DISCNUMBER, TRACKNUMBER, ARTWORK, \ from const import ARTIST, TRACKTITLE, ALBUM, YEAR, DISCNUMBER, TRACKNUMBER, ARTWORK, \
WINDOWS_SYSTEM, ALBUMARTIST WINDOWS_SYSTEM, ALBUMARTIST
from zspotify import ZSpotify
class MusicFormat(str, Enum): class MusicFormat(str, Enum):
MP3 = 'mp3', MP3 = 'mp3',
@ -27,29 +30,29 @@ def create_download_directory(download_path: str) -> None:
with open(hidden_file_path, 'w', encoding='utf-8') as f: with open(hidden_file_path, 'w', encoding='utf-8') as f:
pass pass
def get_previously_downloaded(song_id: str, archive_directory: str) -> List[str]: def get_previously_downloaded() -> List[str]:
""" Returns list of all time downloaded songs """ """ Returns list of all time downloaded songs """
ids = [] ids = []
archive_path = os.path.join(archive_directory, '.song_archive') archive_path = os.path.join(os.path.dirname(__file__), ZSpotify.CONFIG.get_root_path(), ZSpotify.CONFIG.get_song_archive())
if os.path.exists(archive_path): if os.path.exists(archive_path):
with open(archive_path, 'r', encoding='utf-8') as f: with open(archive_path, 'r', encoding='utf-8') as f:
ids = [line.strip() for line in f.readlines()] ids = [line.strip().split('\t')[0] for line in f.readlines()]
return ids return ids
def add_to_archive(song_id: str, archive_directory: str) -> None: def add_to_archive(song_id: str, filename: str, author_name: str, song_name: str) -> None:
""" Adds song id to all time installed songs archive """ """ Adds song id to all time installed songs archive """
archive_path = os.path.join(archive_directory, '.song_archive') archive_path = os.path.join(os.path.dirname(__file__), ZSpotify.CONFIG.get_root_path(), ZSpotify.CONFIG.get_song_archive())
if os.path.exists(archive_path): if os.path.exists(archive_path):
with open(archive_path, 'a', encoding='utf-8') as f: with open(archive_path, 'a', encoding='utf-8') as file:
f.write(f'{song_id}\n') file.write(f'{song_id}\t{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\t{author_name}\t{song_name}\t{filename}\n')
else: else:
with open(archive_path, 'w', encoding='utf-8') as f: with open(archive_path, 'w', encoding='utf-8') as file:
f.write(f'{song_id}\n') file.write(f'{song_id}\t{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\t{author_name}\t{song_name}\t{filename}\n')
def get_directory_song_ids(download_path: str) -> List[str]: def get_directory_song_ids(download_path: str) -> List[str]:
""" Gets song ids of songs in directory """ """ Gets song ids of songs in directory """
@ -59,18 +62,18 @@ def get_directory_song_ids(download_path: str) -> List[str]:
hidden_file_path = os.path.join(download_path, '.song_ids') hidden_file_path = os.path.join(download_path, '.song_ids')
if os.path.isfile(hidden_file_path): if os.path.isfile(hidden_file_path):
with open(hidden_file_path, 'r', encoding='utf-8') as file: with open(hidden_file_path, 'r', encoding='utf-8') as file:
song_ids.extend([line.strip() for line in file.readlines()]) song_ids.extend([line.strip().split('\t')[0] for line in file.readlines()])
return song_ids return song_ids
def add_to_directory_song_ids(download_path: str, song_id: str) -> None: def add_to_directory_song_ids(download_path: str, song_id: str, filename: str, author_name: str, song_name: str) -> None:
""" Appends song_id to .song_ids file in directory """ """ Appends song_id to .song_ids file in directory """
hidden_file_path = os.path.join(download_path, '.song_ids') hidden_file_path = os.path.join(download_path, '.song_ids')
# not checking if file exists because we need an exception # not checking if file exists because we need an exception
# to be raised if something is wrong # to be raised if something is wrong
with open(hidden_file_path, 'a', encoding='utf-8') as file: with open(hidden_file_path, 'a', encoding='utf-8') as file:
file.write(f'{song_id}\n') file.write(f'{song_id}\t{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\t{author_name}\t{song_name}\t{filename}\n')
def get_downloaded_song_duration(filename: str) -> float: def get_downloaded_song_duration(filename: str) -> float:
""" Returns the downloaded file's duration in seconds """ """ Returns the downloaded file's duration in seconds """
@ -83,12 +86,6 @@ def get_downloaded_song_duration(filename: str) -> float:
return duration return duration
def wait(seconds: int = 3) -> None:
""" Pause for a set number of seconds """
for second in range(seconds)[::-1]:
print(f'\rWait for {second + 1} second(s)...', end='')
time.sleep(1)
def split_input(selection) -> List[str]: def split_input(selection) -> List[str]:
""" Returns a list of inputted strings """ """ Returns a list of inputted strings """
@ -103,15 +100,15 @@ def split_input(selection) -> List[str]:
return inputs return inputs
def splash() -> None: def splash() -> str:
""" Displays splash screen """ """ Displays splash screen """
print(""" return """
""") """
def clear() -> None: def clear() -> None:
@ -253,4 +250,4 @@ def fix_filename(name):
>>> all('_' == fix_filename(chr(i)) for i in list(range(32))) >>> all('_' == fix_filename(chr(i)) for i in list(range(32)))
True True
""" """
return re.sub(r'[/\\:|<>"?*\0-\x1f]|^(AUX|COM[1-9]|CON|LPT[1-9]|NUL|PRN)(?![^.])|^\s|[\s.]$', "_", name, flags=re.IGNORECASE) return re.sub(r'[/\\:|<>"?*\0-\x1f]|^(AUX|COM[1-9]|CON|LPT[1-9]|NUL|PRN)(?![^.])|^\s|[\s.]$', "_", str(name), flags=re.IGNORECASE)

View File

@ -16,28 +16,30 @@ import requests
from librespot.audio.decoders import VorbisOnlyAudioQuality from librespot.audio.decoders import VorbisOnlyAudioQuality
from librespot.core import Session from librespot.core import Session
from const import CREDENTIALS_JSON, TYPE, \ from const import TYPE, \
PREMIUM, USER_READ_EMAIL, AUTHORIZATION, OFFSET, LIMIT, CONFIG_FILE_PATH, FORCE_PREMIUM, \ PREMIUM, USER_READ_EMAIL, AUTHORIZATION, OFFSET, LIMIT, \
PLAYLIST_READ_PRIVATE, USER_LIBRARY_READ, CONFIG_DEFAULT_SETTINGS PLAYLIST_READ_PRIVATE, USER_LIBRARY_READ
from utils import MusicFormat from config import Config
class ZSpotify: class ZSpotify:
SESSION: Session = None SESSION: Session = None
DOWNLOAD_QUALITY = None DOWNLOAD_QUALITY = None
CONFIG = {} CONFIG: Config = Config()
def __init__(self): def __init__(self, args):
ZSpotify.load_config() ZSpotify.CONFIG.load(args)
ZSpotify.login() ZSpotify.login()
@classmethod @classmethod
def login(cls): def login(cls):
""" Authenticates with Spotify and saves credentials to a file """ """ Authenticates with Spotify and saves credentials to a file """
if os.path.isfile(CREDENTIALS_JSON): cred_location = os.path.join(os.getcwd(), Config.get_credentials_location())
if os.path.isfile(cred_location):
try: try:
cls.SESSION = Session.Builder().stored_file().create() cls.SESSION = Session.Builder().stored_file(cred_location).create()
return return
except RuntimeError: except RuntimeError:
pass pass
@ -52,22 +54,6 @@ class ZSpotify:
except RuntimeError: except RuntimeError:
pass pass
@classmethod
def load_config(cls) -> None:
app_dir = os.path.dirname(__file__)
true_config_file_path = os.path.join(app_dir, CONFIG_FILE_PATH)
if not os.path.exists(true_config_file_path):
with open(true_config_file_path, 'w', encoding='utf-8') as config_file:
json.dump(CONFIG_DEFAULT_SETTINGS, config_file, indent=4)
cls.CONFIG = CONFIG_DEFAULT_SETTINGS
else:
with open(true_config_file_path, encoding='utf-8') as config_file:
cls.CONFIG = json.load(config_file)
@classmethod
def get_config(cls, key) -> Any:
return cls.CONFIG.get(key)
@classmethod @classmethod
def get_content_stream(cls, content_id, quality): def get_content_stream(cls, content_id, quality):
return cls.SESSION.content_feeder().load(content_id, VorbisOnlyAudioQuality(quality), False, None) return cls.SESSION.content_feeder().load(content_id, VorbisOnlyAudioQuality(quality), False, None)
@ -80,14 +66,14 @@ class ZSpotify:
def get_auth_header(cls): def get_auth_header(cls):
return { return {
'Authorization': f'Bearer {cls.__get_auth_token()}', 'Authorization': f'Bearer {cls.__get_auth_token()}',
'Accept-Language': f'{cls.CONFIG.get("LANGUAGE")}' 'Accept-Language': f'{cls.CONFIG.get_language()}'
} }
@classmethod @classmethod
def get_auth_header_and_params(cls, limit, offset): def get_auth_header_and_params(cls, limit, offset):
return { return {
'Authorization': f'Bearer {cls.__get_auth_token()}', 'Authorization': f'Bearer {cls.__get_auth_token()}',
'Accept-Language': f'{cls.CONFIG.get("LANGUAGE")}' 'Accept-Language': f'{cls.CONFIG.get_language()}'
}, {LIMIT: limit, OFFSET: offset} }, {LIMIT: limit, OFFSET: offset}
@classmethod @classmethod
@ -104,4 +90,4 @@ class ZSpotify:
@classmethod @classmethod
def check_premium(cls) -> bool: def check_premium(cls) -> bool:
""" If user has spotify premium return true """ """ If user has spotify premium return true """
return (cls.SESSION.get_user_attribute(TYPE) == PREMIUM) or cls.get_config(FORCE_PREMIUM) return (cls.SESSION.get_user_attribute(TYPE) == PREMIUM) or cls.CONFIG.get_force_premium()