mirror of
https://github.com/THIS-IS-NOT-A-BACKUP/zspotify.git
synced 2024-11-29 19:24:34 +01:00
Merge pull request #219 from leonbohmann/dev-genre-tags
Added querying of genres of each track
This commit is contained in:
commit
606125db07
@ -25,7 +25,10 @@ PRINT_SKIPS = 'PRINT_SKIPS'
|
||||
PRINT_DOWNLOAD_PROGRESS = 'PRINT_DOWNLOAD_PROGRESS'
|
||||
PRINT_ERRORS = 'PRINT_ERRORS'
|
||||
PRINT_DOWNLOADS = 'PRINT_DOWNLOADS'
|
||||
PRINT_API_ERRORS = 'PRINT_API_ERRORS'
|
||||
TEMP_DOWNLOAD_DIR = 'TEMP_DOWNLOAD_DIR'
|
||||
MD_ALLGENRES = 'MD_ALLGENRES'
|
||||
MD_GENREDELIMITER = 'MD_GENREDELIMITER'
|
||||
|
||||
CONFIG_VALUES = {
|
||||
ROOT_PATH: { 'default': '../ZSpotify Music/', 'type': str, 'arg': '--root-path' },
|
||||
@ -49,7 +52,10 @@ CONFIG_VALUES = {
|
||||
PRINT_DOWNLOAD_PROGRESS: { 'default': 'True', 'type': bool, 'arg': '--print-download-progress' },
|
||||
PRINT_ERRORS: { 'default': 'True', 'type': bool, 'arg': '--print-errors' },
|
||||
PRINT_DOWNLOADS: { 'default': 'False', 'type': bool, 'arg': '--print-downloads' },
|
||||
TEMP_DOWNLOAD_DIR: { 'default': '', 'type': str, 'arg': '--temp-download-dir' },
|
||||
PRINT_API_ERRORS: { 'default': 'False', 'type': bool, 'arg': '--print-api-errors' },
|
||||
MD_ALLGENRES: { 'default': 'False', 'type': bool, 'arg': '--md-allgenres' },
|
||||
MD_GENREDELIMITER: { 'default': ';', 'type': str, 'arg': '--md-genredelimiter' },
|
||||
TEMP_DOWNLOAD_DIR: { 'default': '', 'type': str, 'arg': '--temp-download-dir' }
|
||||
}
|
||||
|
||||
OUTPUT_DEFAULT_PLAYLIST = '{playlist}/{artist} - {song_name}.{ext}'
|
||||
@ -192,7 +198,15 @@ class Config:
|
||||
if cls.get(TEMP_DOWNLOAD_DIR) == '':
|
||||
return ''
|
||||
return os.path.join(cls.get_root_path(), cls.get(TEMP_DOWNLOAD_DIR))
|
||||
|
||||
@classmethod
|
||||
def get_allGenres(cls) -> bool:
|
||||
return cls.get(MD_ALLGENRES)
|
||||
|
||||
@classmethod
|
||||
def get_allGenresDelimiter(cls) -> bool:
|
||||
return cls.get(MD_GENREDELIMITER)
|
||||
|
||||
@classmethod
|
||||
def get_output(cls, mode: str) -> str:
|
||||
v = cls.get(OUTPUT)
|
||||
|
@ -20,6 +20,10 @@ ARTISTS = 'artists'
|
||||
|
||||
ALBUMARTIST = 'albumartist'
|
||||
|
||||
GENRES = 'genres'
|
||||
|
||||
GENRE = 'genre'
|
||||
|
||||
ARTWORK = 'artwork'
|
||||
|
||||
TRACKS = 'tracks'
|
||||
|
@ -1,7 +1,7 @@
|
||||
from enum import Enum
|
||||
from tqdm import tqdm
|
||||
|
||||
from config import PRINT_SPLASH, PRINT_SKIPS, PRINT_DOWNLOAD_PROGRESS, PRINT_ERRORS, PRINT_DOWNLOADS
|
||||
from config import PRINT_SPLASH, PRINT_SKIPS, PRINT_DOWNLOAD_PROGRESS, PRINT_ERRORS, PRINT_DOWNLOADS, PRINT_API_ERRORS
|
||||
from zspotify import ZSpotify
|
||||
|
||||
|
||||
@ -11,6 +11,7 @@ class PrintChannel(Enum):
|
||||
DOWNLOAD_PROGRESS = PRINT_DOWNLOAD_PROGRESS
|
||||
ERRORS = PRINT_ERRORS
|
||||
DOWNLOADS = PRINT_DOWNLOADS
|
||||
API_ERRORS = PRINT_API_ERRORS
|
||||
|
||||
|
||||
class Printer:
|
||||
|
@ -1,5 +1,6 @@
|
||||
import os
|
||||
import re
|
||||
from threading import Thread
|
||||
import time
|
||||
import uuid
|
||||
from typing import Any, Tuple, List
|
||||
@ -8,7 +9,7 @@ from librespot.audio.decoders import AudioQuality
|
||||
from librespot.metadata import TrackId
|
||||
from ffmpy import FFmpeg
|
||||
|
||||
from const import TRACKS, ALBUM, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
|
||||
from const import TRACKS, ALBUM, GENRES, GENRE, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
|
||||
RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS
|
||||
from termoutput import Printer, PrintChannel
|
||||
from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \
|
||||
@ -16,6 +17,8 @@ from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_down
|
||||
from zspotify import ZSpotify
|
||||
import traceback
|
||||
|
||||
from utils import Loader
|
||||
|
||||
def get_saved_tracks() -> list:
|
||||
""" Returns user's saved tracks """
|
||||
songs = []
|
||||
@ -33,17 +36,32 @@ def get_saved_tracks() -> list:
|
||||
return songs
|
||||
|
||||
|
||||
def get_song_info(song_id) -> Tuple[List[str], str, str, Any, Any, Any, Any, Any, Any, int]:
|
||||
def get_song_info(song_id) -> Tuple[List[str], List[str], str, str, Any, Any, Any, Any, Any, Any, int]:
|
||||
""" Retrieves metadata for downloaded songs """
|
||||
(raw, info) = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token')
|
||||
with Loader("Fetching track information..."):
|
||||
(raw, info) = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token')
|
||||
|
||||
if not TRACKS in info:
|
||||
raise ValueError(f'Invalid response from TRACKS_URL:\n{raw}')
|
||||
|
||||
try:
|
||||
artists = []
|
||||
genres = []
|
||||
for data in info[TRACKS][0][ARTISTS]:
|
||||
artists.append(data[NAME])
|
||||
# query artist genres via href, which will be the api url
|
||||
with Loader("Fetching artist information..."):
|
||||
(raw, artistInfo) = ZSpotify.invoke_url(f'{data["href"]}')
|
||||
if ZSpotify.CONFIG.get_allGenres() and len(artistInfo[GENRES]) > 0:
|
||||
for genre in artistInfo[GENRES]:
|
||||
genres.append(genre)
|
||||
elif len(artistInfo[GENRES]) > 0:
|
||||
genres.append(artistInfo[GENRES][0])
|
||||
|
||||
if len(genres) == 0:
|
||||
Printer.print(PrintChannel.SKIPS, '### No Genre found.')
|
||||
genres.append('')
|
||||
|
||||
album_name = info[TRACKS][0][ALBUM][NAME]
|
||||
name = info[TRACKS][0][NAME]
|
||||
image_url = info[TRACKS][0][ALBUM][IMAGES][0][URL]
|
||||
@ -54,7 +72,7 @@ def get_song_info(song_id) -> Tuple[List[str], str, str, Any, Any, Any, Any, Any
|
||||
is_playable = info[TRACKS][0][IS_PLAYABLE]
|
||||
duration_ms = info[TRACKS][0][DURATION_MS]
|
||||
|
||||
return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable, duration_ms
|
||||
return artists, genres, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable, duration_ms
|
||||
except Exception as e:
|
||||
raise ValueError(f'Failed to parse TRACKS_URL response: {str(e)}\n{raw}')
|
||||
|
||||
@ -82,9 +100,12 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
|
||||
try:
|
||||
output_template = ZSpotify.CONFIG.get_output(mode)
|
||||
|
||||
(artists, album_name, name, image_url, release_year, disc_number,
|
||||
(artists, genres, album_name, name, image_url, release_year, disc_number,
|
||||
track_number, scraped_song_id, is_playable, duration_ms) = get_song_info(track_id)
|
||||
|
||||
|
||||
prepareDownloadLoader = Loader("Preparing download...");
|
||||
prepareDownloadLoader.start()
|
||||
|
||||
song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name)
|
||||
|
||||
for k in extra_keys:
|
||||
@ -131,12 +152,15 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
|
||||
else:
|
||||
try:
|
||||
if not is_playable:
|
||||
prepareDownloadLoader.stop();
|
||||
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG IS UNAVAILABLE) ###' + "\n")
|
||||
else:
|
||||
if check_id and check_name and ZSpotify.CONFIG.get_skip_existing_files():
|
||||
prepareDownloadLoader.stop();
|
||||
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY EXISTS) ###' + "\n")
|
||||
|
||||
elif check_all_time and ZSpotify.CONFIG.get_skip_previously_downloaded():
|
||||
prepareDownloadLoader.stop();
|
||||
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE) ###' + "\n")
|
||||
|
||||
else:
|
||||
@ -147,6 +171,8 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
|
||||
create_download_directory(filedir)
|
||||
total_size = stream.input_stream.size
|
||||
|
||||
prepareDownloadLoader.stop();
|
||||
|
||||
time_start = time.time()
|
||||
downloaded = 0
|
||||
with open(filename_temp, 'wb') as file, Printer.progress(
|
||||
@ -170,7 +196,7 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
|
||||
time_downloaded = time.time()
|
||||
|
||||
convert_audio_format(filename_temp)
|
||||
set_audio_tags(filename_temp, artists, name, album_name, release_year, disc_number, track_number)
|
||||
set_audio_tags(filename_temp, artists, genres, name, album_name, release_year, disc_number, track_number)
|
||||
set_music_thumbnail(filename_temp, image_url)
|
||||
|
||||
if filename_temp != filename:
|
||||
@ -196,8 +222,8 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
|
||||
Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
|
||||
if os.path.exists(filename_temp):
|
||||
os.remove(filename_temp)
|
||||
|
||||
|
||||
|
||||
prepareDownloadLoader.stop()
|
||||
def convert_audio_format(filename) -> None:
|
||||
""" Converts raw audio into playable file """
|
||||
temp_filename = f'{os.path.splitext(filename)[0]}.tmp'
|
||||
@ -224,6 +250,9 @@ def convert_audio_format(filename) -> None:
|
||||
inputs={temp_filename: None},
|
||||
outputs={filename: output_params}
|
||||
)
|
||||
ff_m.run()
|
||||
|
||||
with Loader("Converting file..."):
|
||||
ff_m.run()
|
||||
|
||||
if os.path.exists(temp_filename):
|
||||
os.remove(temp_filename)
|
||||
|
@ -10,7 +10,7 @@ from typing import List, Tuple
|
||||
import music_tag
|
||||
import requests
|
||||
|
||||
from const import ARTIST, TRACKTITLE, ALBUM, YEAR, DISCNUMBER, TRACKNUMBER, ARTWORK, \
|
||||
from const import ARTIST, GENRE, TRACKTITLE, ALBUM, YEAR, DISCNUMBER, TRACKNUMBER, ARTWORK, \
|
||||
WINDOWS_SYSTEM, ALBUMARTIST
|
||||
from zspotify import ZSpotify
|
||||
|
||||
@ -124,11 +124,12 @@ def clear() -> None:
|
||||
os.system('clear')
|
||||
|
||||
|
||||
def set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number) -> None:
|
||||
def set_audio_tags(filename, artists, genres, name, album_name, release_year, disc_number, track_number) -> None:
|
||||
""" sets music_tag metadata """
|
||||
tags = music_tag.load_file(filename)
|
||||
tags[ALBUMARTIST] = artists[0]
|
||||
tags[ARTIST] = conv_artist_format(artists)
|
||||
tags[GENRE] = genres[0] if not ZSpotify.CONFIG.get_allGenres() else ZSpotify.CONFIG.get_allGenresDelimiter().join(genres)
|
||||
tags[TRACKTITLE] = name
|
||||
tags[ALBUM] = album_name
|
||||
tags[YEAR] = release_year
|
||||
@ -279,3 +280,84 @@ def fmt_seconds(secs: float) -> str:
|
||||
return f'{m}'.zfill(2) + ':' + f'{s}'.zfill(2)
|
||||
else:
|
||||
return f'{h}'.zfill(2) + ':' + f'{m}'.zfill(2) + ':' + f'{s}'.zfill(2)
|
||||
|
||||
|
||||
# load symbol from:
|
||||
# https://stackoverflow.com/questions/22029562/python-how-to-make-simple-animated-loading-while-process-is-running
|
||||
|
||||
# imports
|
||||
from itertools import cycle
|
||||
from shutil import get_terminal_size
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
|
||||
class Loader:
|
||||
"""Busy symbol.
|
||||
|
||||
Can be called inside a context:
|
||||
|
||||
with Loader("This take some Time..."):
|
||||
# do something
|
||||
pass
|
||||
"""
|
||||
def __init__(self, desc="Loading...", end='', timeout=0.1, mode='std1'):
|
||||
"""
|
||||
A loader-like context manager
|
||||
|
||||
Args:
|
||||
desc (str, optional): The loader's description. Defaults to "Loading...".
|
||||
end (str, optional): Final print. Defaults to "".
|
||||
timeout (float, optional): Sleep time between prints. Defaults to 0.1.
|
||||
"""
|
||||
self.desc = desc
|
||||
self.end = end
|
||||
self.timeout = timeout
|
||||
|
||||
self._thread = Thread(target=self._animate, daemon=True)
|
||||
if mode == 'std1':
|
||||
self.steps = ["⢿", "⣻", "⣽", "⣾", "⣷", "⣯", "⣟", "⡿"]
|
||||
elif mode == 'std2':
|
||||
self.steps = ["◜","◝","◞","◟"]
|
||||
elif mode == 'std3':
|
||||
self.steps = ["😐 ","😐 ","😮 ","😮 ","😦 ","😦 ","😧 ","😧 ","🤯 ","💥 ","✨ ","\u3000 ","\u3000 ","\u3000 "]
|
||||
elif mode == 'prog':
|
||||
self.steps = ["[∙∙∙]","[●∙∙]","[∙●∙]","[∙∙●]","[∙∙∙]"]
|
||||
|
||||
self.done = False
|
||||
|
||||
def start(self):
|
||||
self._thread.start()
|
||||
return self
|
||||
|
||||
def _animate(self):
|
||||
for c in cycle(self.steps):
|
||||
if self.done:
|
||||
break
|
||||
print(f"\r\t{c} {self.desc} ", flush=True, end="")
|
||||
sleep(self.timeout)
|
||||
|
||||
def __enter__(self):
|
||||
self.start()
|
||||
|
||||
def stop(self):
|
||||
self.done = True
|
||||
cols = get_terminal_size((80, 20)).columns
|
||||
print("\r" + " " * cols, end="", flush=True)
|
||||
|
||||
if self.end != "":
|
||||
print(f"\r{self.end}", flush=True)
|
||||
|
||||
def __exit__(self, exc_type, exc_value, tb):
|
||||
# handle exceptions with those variables ^
|
||||
self.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
with Loader("Loading with context manager..."):
|
||||
for i in range(10):
|
||||
sleep(0.25)
|
||||
|
||||
loader = Loader("Loading with object...", "That was fast!", 0.05).start()
|
||||
for i in range(10):
|
||||
sleep(0.25)
|
||||
loader.stop()
|
||||
|
@ -9,7 +9,7 @@ It's like youtube-dl, but for Spotify.
|
||||
import os
|
||||
import os.path
|
||||
from getpass import getpass
|
||||
|
||||
import time
|
||||
import requests
|
||||
from librespot.audio.decoders import VorbisOnlyAudioQuality
|
||||
from librespot.core import Session
|
||||
@ -19,8 +19,7 @@ from const import TYPE, \
|
||||
PLAYLIST_READ_PRIVATE, USER_LIBRARY_READ
|
||||
from config import Config
|
||||
|
||||
|
||||
class ZSpotify:
|
||||
class ZSpotify:
|
||||
SESSION: Session = None
|
||||
DOWNLOAD_QUALITY = None
|
||||
CONFIG: Config = Config()
|
||||
@ -82,10 +81,21 @@ class ZSpotify:
|
||||
return requests.get(url, headers=headers, params=params).json()
|
||||
|
||||
@classmethod
|
||||
def invoke_url(cls, url):
|
||||
def invoke_url(cls, url, tryCount = 0):
|
||||
# we need to import that here, otherwise we will get circular imports!
|
||||
from termoutput import Printer, PrintChannel
|
||||
headers = cls.get_auth_header()
|
||||
response = requests.get(url, headers=headers)
|
||||
return response.text, response.json()
|
||||
responseText = response.text
|
||||
responseJson = response.json()
|
||||
|
||||
if 'error' in responseJson and tryCount < 5:
|
||||
|
||||
Printer.Print(PrintChannel.API_ERROR, f"Spotify API Error ({responseJson['error']['status']}): {responseJson['error']['message']}")
|
||||
time.sleep(5)
|
||||
return cls.invoke_url(url, tryCount + 1)
|
||||
|
||||
return responseText, responseJson
|
||||
|
||||
@classmethod
|
||||
def check_premium(cls) -> bool:
|
||||
|
Loading…
Reference in New Issue
Block a user