Merge branch 'main' into master

This commit is contained in:
Logykk
2021-12-01 20:11:03 +13:00
committed by GitHub
10 changed files with 196 additions and 88 deletions

View File

@@ -26,18 +26,18 @@ def get_album_tracks(album_id):
def get_album_name(album_id):
""" Returns album name """
resp = ZSpotify.invoke_url(f'{ALBUM_URL}/{album_id}')
(raw, resp) = ZSpotify.invoke_url(f'{ALBUM_URL}/{album_id}')
return resp[ARTISTS][0][NAME], fix_filename(resp[NAME])
def get_artist_albums(artist_id):
""" Returns artist's albums """
resp = ZSpotify.invoke_url(f'{ARTIST_URL}/{artist_id}/albums?include_groups=album%2Csingle')
(raw, resp) = ZSpotify.invoke_url(f'{ARTIST_URL}/{artist_id}/albums?include_groups=album%2Csingle')
# Return a list each album's id
album_ids = [resp[ITEMS][i][ID] for i in range(len(resp[ITEMS]))]
# Recursive requests to get all albums including singles an EPs
while resp['next'] is not None:
resp = ZSpotify.invoke_url(resp['next'])
(raw, resp) = ZSpotify.invoke_url(resp['next'])
album_ids.extend([resp[ITEMS][i][ID] for i in range(len(resp[ITEMS]))])
return album_ids

View File

@@ -9,7 +9,7 @@ from playlist import get_playlist_songs, get_playlist_info, download_from_user_p
from podcast import download_episode, get_show_episodes
from termoutput import Printer, PrintChannel
from track import download_track, get_saved_tracks
from utils import fix_filename, splash, split_input, regex_input_for_urls
from utils import splash, split_input, regex_input_for_urls
from zspotify import ZSpotify
SEARCH_URL = 'https://api.spotify.com/v1/search'
@@ -49,7 +49,7 @@ def client(args) -> None:
if args.liked_songs:
for song in get_saved_tracks():
if not song[TRACK][NAME]:
Printer.print(PrintChannel.ERRORS, '### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###' + "\n")
Printer.print(PrintChannel.SKIPS, '### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###' + "\n")
else:
download_track('liked', song[TRACK][ID])
@@ -85,8 +85,11 @@ def download_from_urls(urls: list[str]) -> bool:
enum = 1
char_num = len(str(len(playlist_songs)))
for song in playlist_songs:
download_track('playlist', song[TRACK][ID], extra_keys={'playlist': name, 'playlist_num': str(enum).zfill(char_num)})
enum += 1
if not song[TRACK][NAME]:
Printer.print(PrintChannel.SKIPS, '### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###' + "\n")
else:
download_track('playlist', song[TRACK][ID], extra_keys={'playlist': name, 'playlist_num': str(enum).zfill(char_num)})
enum += 1
elif episode_id is not None:
download = True
download_episode(episode_id)
@@ -256,11 +259,9 @@ def search(search_term):
print('NO RESULTS FOUND - EXITING...')
else:
selection = ''
print('\n> SELECT A DOWNLOAD OPTION BY ID')
print('> SELECT A DOWNLOAD OPTION BY ID')
print('> SELECT A RANGE BY ADDING A DASH BETWEEN BOTH ID\'s')
print('> OR PARTICULAR OPTIONS BY ADDING A COMMA BETWEEN ID\'s')
print('> For example, typing 5 to get option 5 or 10-20 to get\nevery option from 10-20 (inclusive)\n')
print('> Or type 10,12,15,18 to get those options in particular')
print('> OR PARTICULAR OPTIONS BY ADDING A COMMA BETWEEN ID\'s\n')
while len(selection) == 0:
selection = str(input('ID(s): '))
inputs = split_input(selection)

View File

@@ -1,8 +1,6 @@
import json
import os
import sys
from typing import Any
from enum import Enum
CONFIG_FILE_PATH = '../zs_config.json'
@@ -27,6 +25,7 @@ PRINT_SKIPS = 'PRINT_SKIPS'
PRINT_DOWNLOAD_PROGRESS = 'PRINT_DOWNLOAD_PROGRESS'
PRINT_ERRORS = 'PRINT_ERRORS'
PRINT_DOWNLOADS = 'PRINT_DOWNLOADS'
TEMP_DOWNLOAD_DIR = 'TEMP_DOWNLOAD_DIR'
CONFIG_VALUES = {
ROOT_PATH: { 'default': '../ZSpotify Music/', 'type': str, 'arg': '--root-path' },
@@ -50,6 +49,7 @@ CONFIG_VALUES = {
PRINT_DOWNLOAD_PROGRESS: { 'default': 'True', 'type': bool, 'arg': '--print-download-progress' },
PRINT_ERRORS: { 'default': 'True', 'type': bool, 'arg': '--print-errors' },
PRINT_DOWNLOADS: { 'default': 'False', 'type': bool, 'arg': '--print-downloads' },
TEMP_DOWNLOAD_DIR: { 'default': '', 'type': str, 'arg': '--temp-download-dir' },
}
OUTPUT_DEFAULT_PLAYLIST = '{playlist}/{artist} - {song_name}.{ext}'
@@ -129,11 +129,11 @@ class Config:
@classmethod
def get_root_path(cls) -> str:
return cls.get(ROOT_PATH)
return os.path.join(os.path.dirname(__file__), cls.get(ROOT_PATH))
@classmethod
def get_root_podcast_path(cls) -> str:
return cls.get(ROOT_PODCAST_PATH)
return os.path.join(os.path.dirname(__file__), cls.get(ROOT_PODCAST_PATH))
@classmethod
def get_skip_existing_files(cls) -> bool:
@@ -181,11 +181,17 @@ class Config:
@classmethod
def get_song_archive(cls) -> str:
return cls.get(SONG_ARCHIVE)
return os.path.join(cls.get_root_path(), cls.get(SONG_ARCHIVE))
@classmethod
def get_credentials_location(cls) -> str:
return cls.get(CREDENTIALS_LOCATION)
return os.path.join(os.getcwd(), cls.get(CREDENTIALS_LOCATION))
@classmethod
def get_temp_download_dir(cls) -> str:
if cls.get(TEMP_DOWNLOAD_DIR) == '':
return ''
return os.path.join(cls.get_root_path(), cls.get(TEMP_DOWNLOAD_DIR))
@classmethod
def get_output(cls, mode: str) -> str:

View File

@@ -1,7 +1,7 @@
from const import ITEMS, ID, TRACK, NAME
from termoutput import Printer
from track import download_track
from utils import fix_filename, split_input
from utils import split_input
from zspotify import ZSpotify
MY_PLAYLISTS_URL = 'https://api.spotify.com/v1/me/playlists'
@@ -42,7 +42,7 @@ def get_playlist_songs(playlist_id):
def get_playlist_info(playlist_id):
""" Returns information scraped from playlist """
resp = ZSpotify.invoke_url(f'{PLAYLISTS_URL}/{playlist_id}?fields=name,owner(display_name)&market=from_token')
(raw, resp) = ZSpotify.invoke_url(f'{PLAYLISTS_URL}/{playlist_id}?fields=name,owner(display_name)&market=from_token')
return resp['name'].strip(), resp['owner']['display_name'].strip()
@@ -70,9 +70,7 @@ def download_from_user_playlist():
selection = ''
print('\n> SELECT A PLAYLIST BY ID')
print('> SELECT A RANGE BY ADDING A DASH BETWEEN BOTH ID\'s')
print('> OR PARTICULAR OPTIONS BY ADDING A COMMA BETWEEN ID\'s')
print('> For example, typing 10 to get one playlist or 10-20 to get\nevery playlist from 10-20 (inclusive)\n')
print('> Or type 10,12,15,18 to get those playlists in particular')
print('> OR PARTICULAR OPTIONS BY ADDING A COMMA BETWEEN ID\'s\n')
while len(selection) == 0:
selection = str(input('ID(s): '))
playlist_choices = map(int, split_input(selection))

View File

@@ -1,7 +1,6 @@
import os
from typing import Optional, Tuple
from librespot.audio.decoders import VorbisOnlyAudioQuality
from librespot.metadata import EpisodeId
from const import (ERROR, ID, ITEMS, NAME, SHOW)
@@ -14,7 +13,7 @@ SHOWS_URL = 'https://api.spotify.com/v1/shows'
def get_episode_info(episode_id_str) -> Tuple[Optional[str], Optional[str]]:
info = ZSpotify.invoke_url(f'{EPISODE_INFO_URL}/{episode_id_str}')
(raw, info) = ZSpotify.invoke_url(f'{EPISODE_INFO_URL}/{episode_id_str}')
if ERROR in info:
return None, None
return fix_filename(info[SHOW][NAME]), fix_filename(info[NAME])
@@ -70,18 +69,14 @@ def download_episode(episode_id) -> None:
extra_paths = podcast_name + '/'
if podcast_name is None:
Printer.print(PrintChannel.ERRORS, '### SKIPPING: (EPISODE NOT FOUND) ###')
Printer.print(PrintChannel.SKIPS, '### SKIPPING: (EPISODE NOT FOUND) ###')
else:
filename = podcast_name + ' - ' + episode_name
direct_download_url = ZSpotify.invoke_url(
'https://api-partner.spotify.com/pathfinder/v1/query?operationName=getEpisode&variables={"uri":"spotify:episode:' + episode_id + '"}&extensions={"persistedQuery":{"version":1,"sha256Hash":"224ba0fd89fcfdfb3a15fa2d82a6112d3f4e2ac88fba5c6713de04d1b72cf482"}}')["data"]["episode"]["audio"]["items"][-1]["url"]
'https://api-partner.spotify.com/pathfinder/v1/query?operationName=getEpisode&variables={"uri":"spotify:episode:' + episode_id + '"}&extensions={"persistedQuery":{"version":1,"sha256Hash":"224ba0fd89fcfdfb3a15fa2d82a6112d3f4e2ac88fba5c6713de04d1b72cf482"}}')[1]["data"]["episode"]["audio"]["items"][-1]["url"]
download_directory = os.path.join(
os.path.dirname(__file__),
ZSpotify.CONFIG.get_root_podcast_path(),
extra_paths,
)
download_directory = os.path.join(ZSpotify.CONFIG.get_root_podcast_path(), extra_paths)
download_directory = os.path.realpath(download_directory)
create_download_directory(download_directory)

View File

@@ -1,20 +1,20 @@
import os
import re
import time
import uuid
from typing import Any, Tuple, List
from librespot.audio.decoders import AudioQuality
from librespot.metadata import TrackId
from ffmpy import FFmpeg
from pydub import AudioSegment
from const import TRACKS, ALBUM, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS
from termoutput import Printer, PrintChannel
from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \
get_directory_song_ids, add_to_directory_song_ids, get_previously_downloaded, add_to_archive
get_directory_song_ids, add_to_directory_song_ids, get_previously_downloaded, add_to_archive, fmt_seconds
from zspotify import ZSpotify
import traceback
def get_saved_tracks() -> list:
""" Returns user's saved tracks """
@@ -35,27 +35,34 @@ def get_saved_tracks() -> list:
def get_song_info(song_id) -> Tuple[List[str], str, str, Any, Any, Any, Any, Any, Any, int]:
""" Retrieves metadata for downloaded songs """
info = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token')
(raw, info) = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token')
artists = []
for data in info[TRACKS][0][ARTISTS]:
artists.append(data[NAME])
album_name = info[TRACKS][0][ALBUM][NAME]
name = info[TRACKS][0][NAME]
image_url = info[TRACKS][0][ALBUM][IMAGES][0][URL]
release_year = info[TRACKS][0][ALBUM][RELEASE_DATE].split('-')[0]
disc_number = info[TRACKS][0][DISC_NUMBER]
track_number = info[TRACKS][0][TRACK_NUMBER]
scraped_song_id = info[TRACKS][0][ID]
is_playable = info[TRACKS][0][IS_PLAYABLE]
duration_ms = info[TRACKS][0][DURATION_MS]
if not TRACKS in info:
raise ValueError(f'Invalid response from TRACKS_URL:\n{raw}')
try:
artists = []
for data in info[TRACKS][0][ARTISTS]:
artists.append(data[NAME])
album_name = info[TRACKS][0][ALBUM][NAME]
name = info[TRACKS][0][NAME]
image_url = info[TRACKS][0][ALBUM][IMAGES][0][URL]
release_year = info[TRACKS][0][ALBUM][RELEASE_DATE].split('-')[0]
disc_number = info[TRACKS][0][DISC_NUMBER]
track_number = info[TRACKS][0][TRACK_NUMBER]
scraped_song_id = info[TRACKS][0][ID]
is_playable = info[TRACKS][0][IS_PLAYABLE]
duration_ms = info[TRACKS][0][DURATION_MS]
return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable, duration_ms
except Exception as e:
raise ValueError(f'Failed to parse TRACKS_URL response: {str(e)}\n{raw}')
return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable, duration_ms
def get_song_duration(song_id: str) -> float:
""" Retrieves duration of song in second as is on spotify """
resp = ZSpotify.invoke_url(f'{TRACK_STATS_URL}{song_id}')
(raw, resp) = ZSpotify.invoke_url(f'{TRACK_STATS_URL}{song_id}')
# get duration in miliseconds
ms_duration = resp['duration_ms']
@@ -83,6 +90,8 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
for k in extra_keys:
output_template = output_template.replace("{"+k+"}", fix_filename(extra_keys[k]))
ext = EXT_MAP.get(ZSpotify.CONFIG.get_download_format().lower())
output_template = output_template.replace("{artist}", fix_filename(artists[0]))
output_template = output_template.replace("{album}", fix_filename(album_name))
output_template = output_template.replace("{song_name}", fix_filename(name))
@@ -91,11 +100,15 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
output_template = output_template.replace("{track_number}", fix_filename(track_number))
output_template = output_template.replace("{id}", fix_filename(scraped_song_id))
output_template = output_template.replace("{track_id}", fix_filename(track_id))
output_template = output_template.replace("{ext}", EXT_MAP.get(ZSpotify.CONFIG.get_download_format().lower()))
output_template = output_template.replace("{ext}", ext)
filename = os.path.join(os.path.dirname(__file__), ZSpotify.CONFIG.get_root_path(), output_template)
filename = os.path.join(ZSpotify.CONFIG.get_root_path(), output_template)
filedir = os.path.dirname(filename)
filename_temp = filename
if ZSpotify.CONFIG.get_temp_download_dir() != '':
filename_temp = os.path.join(ZSpotify.CONFIG.get_temp_download_dir(), f'zspotify_{str(uuid.uuid4())}_{track_id}.{ext}')
check_name = os.path.isfile(filename) and os.path.getsize(filename)
check_id = scraped_song_id in get_directory_song_ids(filedir)
check_all_time = scraped_song_id in get_previously_downloaded()
@@ -112,7 +125,9 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
except Exception as e:
Printer.print(PrintChannel.ERRORS, '### SKIPPING SONG - FAILED TO QUERY METADATA ###')
Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id) + "\n")
Printer.print(PrintChannel.ERRORS, str(e) + "\n")
Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
else:
try:
if not is_playable:
@@ -128,12 +143,13 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
if track_id != scraped_song_id:
track_id = scraped_song_id
track_id = TrackId.from_base62(track_id)
stream = ZSpotify.get_content_stream(
track_id, ZSpotify.DOWNLOAD_QUALITY)
stream = ZSpotify.get_content_stream(track_id, ZSpotify.DOWNLOAD_QUALITY)
create_download_directory(filedir)
total_size = stream.input_stream.size
with open(filename, 'wb') as file, Printer.progress(
time_start = time.time()
downloaded = 0
with open(filename_temp, 'wb') as file, Printer.progress(
desc=song_name,
total=total_size,
unit='B',
@@ -141,18 +157,28 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
unit_divisor=1024,
disable=disable_progressbar
) as p_bar:
pause = duration_ms / ZSpotify.CONFIG.get_chunk_size()
for chunk in range(int(total_size / ZSpotify.CONFIG.get_chunk_size()) + 1):
data = stream.input_stream.stream().read(ZSpotify.CONFIG.get_chunk_size())
p_bar.update(file.write(data))
downloaded += len(data)
if ZSpotify.CONFIG.get_download_real_time():
time.sleep(pause)
delta_real = time.time() - time_start
delta_want = (downloaded / total_size) * (duration_ms/1000)
if delta_want > delta_real:
time.sleep(delta_want - delta_real)
convert_audio_format(filename)
set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number)
set_music_thumbnail(filename, image_url)
time_downloaded = time.time()
Printer.print(PrintChannel.DOWNLOADS, f'### Downloaded "{song_name}" to "{os.path.relpath(filename, os.path.dirname(__file__))}" ###' + "\n")
convert_audio_format(filename_temp)
set_audio_tags(filename_temp, artists, name, album_name, release_year, disc_number, track_number)
set_music_thumbnail(filename_temp, image_url)
if filename_temp != filename:
os.rename(filename_temp, filename)
time_finished = time.time()
Printer.print(PrintChannel.DOWNLOADS, f'### Downloaded "{song_name}" to "{os.path.relpath(filename, ZSpotify.CONFIG.get_root_path())}" in {fmt_seconds(time_downloaded - time_start)} (plus {fmt_seconds(time_finished - time_downloaded)} converting) ###' + "\n")
# add song id to archive file
if ZSpotify.CONFIG.get_skip_previously_downloaded():
@@ -165,9 +191,11 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
time.sleep(ZSpotify.CONFIG.get_anti_ban_wait_time())
except Exception as e:
Printer.print(PrintChannel.ERRORS, '### SKIPPING: ' + song_name + ' (GENERAL DOWNLOAD ERROR) ###')
Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id) + "\n")
Printer.print(PrintChannel.ERRORS, str(e) + "\n")
if os.path.exists(filename):
os.remove(filename)
Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
if os.path.exists(filename_temp):
os.remove(filename_temp)
def convert_audio_format(filename) -> None:

View File

@@ -1,9 +1,9 @@
import datetime
import math
import os
import platform
import re
import subprocess
import time
from enum import Enum
from typing import List, Tuple
@@ -30,11 +30,12 @@ def create_download_directory(download_path: str) -> None:
with open(hidden_file_path, 'w', encoding='utf-8') as f:
pass
def get_previously_downloaded() -> List[str]:
""" Returns list of all time downloaded songs """
ids = []
archive_path = os.path.join(os.path.dirname(__file__), ZSpotify.CONFIG.get_root_path(), ZSpotify.CONFIG.get_song_archive())
archive_path = ZSpotify.CONFIG.get_song_archive()
if os.path.exists(archive_path):
with open(archive_path, 'r', encoding='utf-8') as f:
@@ -42,10 +43,11 @@ def get_previously_downloaded() -> List[str]:
return ids
def add_to_archive(song_id: str, filename: str, author_name: str, song_name: str) -> None:
""" Adds song id to all time installed songs archive """
archive_path = os.path.join(os.path.dirname(__file__), ZSpotify.CONFIG.get_root_path(), ZSpotify.CONFIG.get_song_archive())
archive_path = ZSpotify.CONFIG.get_song_archive()
if os.path.exists(archive_path):
with open(archive_path, 'a', encoding='utf-8') as file:
@@ -54,6 +56,7 @@ def add_to_archive(song_id: str, filename: str, author_name: str, song_name: str
with open(archive_path, 'w', encoding='utf-8') as file:
file.write(f'{song_id}\t{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\t{author_name}\t{song_name}\t{filename}\n')
def get_directory_song_ids(download_path: str) -> List[str]:
""" Gets song ids of songs in directory """
@@ -66,6 +69,7 @@ def get_directory_song_ids(download_path: str) -> List[str]:
return song_ids
def add_to_directory_song_ids(download_path: str, song_id: str, filename: str, author_name: str, song_name: str) -> None:
""" Appends song_id to .song_ids file in directory """
@@ -75,6 +79,7 @@ def add_to_directory_song_ids(download_path: str, song_id: str, filename: str, a
with open(hidden_file_path, 'a', encoding='utf-8') as file:
file.write(f'{song_id}\t{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\t{author_name}\t{song_name}\t{filename}\n')
def get_downloaded_song_duration(filename: str) -> float:
""" Returns the downloaded file's duration in seconds """
@@ -251,3 +256,26 @@ def fix_filename(name):
True
"""
return re.sub(r'[/\\:|<>"?*\0-\x1f]|^(AUX|COM[1-9]|CON|LPT[1-9]|NUL|PRN)(?![^.])|^\s|[\s.]$', "_", str(name), flags=re.IGNORECASE)
def fmt_seconds(secs: float) -> str:
val = math.floor(secs)
s = math.floor(val % 60)
val -= s
val /= 60
m = math.floor(val % 60)
val -= m
val /= 60
h = math.floor(val)
if h == 0 and m == 0 and s == 0:
return "0"
elif h == 0 and m == 0:
return f'{s}'.zfill(2)
elif h == 0:
return f'{m}'.zfill(2) + ':' + f'{s}'.zfill(2)
else:
return f'{h}'.zfill(2) + ':' + f'{m}'.zfill(2) + ':' + f'{s}'.zfill(2)

View File

@@ -6,18 +6,16 @@ It's like youtube-dl, but for Spotify.
(Made by Deathmonger/Footsiefat - @doomslayer117:matrix.org)
"""
import json
import os
import os.path
from getpass import getpass
from typing import Any
import requests
from librespot.audio.decoders import VorbisOnlyAudioQuality
from librespot.core import Session
from const import TYPE, \
PREMIUM, USER_READ_EMAIL, AUTHORIZATION, OFFSET, LIMIT, \
PREMIUM, USER_READ_EMAIL, OFFSET, LIMIT, \
PLAYLIST_READ_PRIVATE, USER_LIBRARY_READ
from config import Config
@@ -35,7 +33,7 @@ class ZSpotify:
def login(cls):
""" Authenticates with Spotify and saves credentials to a file """
cred_location = os.path.join(os.getcwd(), Config.get_credentials_location())
cred_location = Config.get_credentials_location()
if os.path.isfile(cred_location):
try:
@@ -49,7 +47,8 @@ class ZSpotify:
user_name = input('Username: ')
password = getpass()
try:
cls.SESSION = Session.Builder().user_pass(user_name, password).create()
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).build()
cls.SESSION = Session.Builder(conf).user_pass(user_name, password).create()
return
except RuntimeError:
pass
@@ -85,7 +84,8 @@ class ZSpotify:
@classmethod
def invoke_url(cls, url):
headers = cls.get_auth_header()
return requests.get(url, headers=headers).json()
response = requests.get(url, headers=headers)
return response.text, response.json()
@classmethod
def check_premium(cls) -> bool: