mirror of
https://github.com/THIS-IS-NOT-A-BACKUP/zspotify.git
synced 2025-07-01 23:33:11 +00:00
fixed user playlist download issue
1. added new scope `playlist-read-private` for access token 2. Added progress bar far playlist download(previously it's showing for each song separately)
This commit is contained in:
parent
714e2eccfb
commit
65093fa0d2
15
src/app.py
15
src/app.py
@ -6,7 +6,7 @@ from tabulate import tabulate
|
||||
from album import download_album, download_artist_albums
|
||||
from const import TRACK, NAME, ID, ARTISTS, ITEMS, TRACKS, EXPLICIT, ALBUMS, OWNER, \
|
||||
PLAYLISTS, DISPLAY_NAME
|
||||
from playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist
|
||||
from playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist, download_playlist
|
||||
from podcast import download_episode, get_show_episodes
|
||||
from track import download_track, get_saved_tracks
|
||||
from utils import sanitize_data, splash, split_input, regex_input_for_urls
|
||||
@ -126,14 +126,14 @@ def search(search_term):
|
||||
|
||||
artists = resp[ARTISTS][ITEMS]
|
||||
if len(artists) > 0:
|
||||
print("### ARTISTS ###")
|
||||
print('### ARTISTS ###')
|
||||
artist_data = []
|
||||
for artist in artists:
|
||||
artist_data.append([counter, artist[NAME]])
|
||||
counter += 1
|
||||
total_artists = counter - total_tracks - total_albums - 1
|
||||
print(tabulate(artist_data, headers=['S.NO', 'Name'], tablefmt='pretty'))
|
||||
print("\n")
|
||||
print('\n')
|
||||
else:
|
||||
total_artists = 0
|
||||
|
||||
@ -161,14 +161,7 @@ def search(search_term):
|
||||
elif position <= total_artists + total_tracks + total_albums:
|
||||
download_artist_albums(artists[position - total_tracks - total_albums - 1][ID])
|
||||
else:
|
||||
playlist_choice = playlists[position -
|
||||
total_tracks - total_albums - total_artists - 1]
|
||||
playlist_songs = get_playlist_songs(playlist_choice[ID])
|
||||
for song in playlist_songs:
|
||||
if song[TRACK][ID] is not None:
|
||||
download_track(song[TRACK][ID], sanitize_data(playlist_choice[NAME].strip()) + "/",
|
||||
disable_progressbar=True)
|
||||
print("\n")
|
||||
download_playlist(playlists, position - total_tracks - total_albums - total_artists)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -68,6 +68,8 @@ PREMIUM = 'premium'
|
||||
|
||||
USER_READ_EMAIL = 'user-read-email'
|
||||
|
||||
PLAYLIST_READ_PRIVATE = 'playlist-read-private'
|
||||
|
||||
WINDOWS_SYSTEM = 'Windows'
|
||||
|
||||
CREDENTIALS_JSON = '../credentials.json'
|
||||
|
@ -1,3 +1,5 @@
|
||||
from tqdm import tqdm
|
||||
|
||||
from const import ITEMS, ID, TRACK, NAME
|
||||
from track import download_track
|
||||
from utils import sanitize_data
|
||||
@ -45,15 +47,15 @@ def get_playlist_info(playlist_id):
|
||||
return resp['name'].strip(), resp['owner']['display_name'].strip()
|
||||
|
||||
|
||||
def download_playlist(playlists, playlist_choice):
|
||||
def download_playlist(playlists, playlist_number):
|
||||
"""Downloads all the songs from a playlist"""
|
||||
playlist_songs = get_playlist_songs(playlists[int(playlist_choice) - 1][ID])
|
||||
|
||||
for song in playlist_songs:
|
||||
if song[TRACK][ID] is not None:
|
||||
download_track(song[TRACK][ID], sanitize_data(
|
||||
playlists[int(playlist_choice) - 1][NAME].strip()) + '/')
|
||||
print('\n')
|
||||
playlist_songs = [song for song in get_playlist_songs(playlists[int(playlist_number) - 1][ID]) if song[TRACK][ID]]
|
||||
p_bar = tqdm(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True)
|
||||
for song in p_bar:
|
||||
download_track(song[TRACK][ID], sanitize_data(playlists[int(playlist_number) - 1][NAME].strip()) + '/',
|
||||
disable_progressbar=True)
|
||||
p_bar.set_description(song[TRACK][NAME])
|
||||
|
||||
|
||||
def download_from_user_playlist():
|
||||
@ -79,7 +81,7 @@ def download_from_user_playlist():
|
||||
|
||||
print(f'Downloading from {start} to {end}...')
|
||||
|
||||
for playlist in range(start, end):
|
||||
download_playlist(playlists, playlist)
|
||||
for playlist_number in range(start, end):
|
||||
download_playlist(playlists, playlist_number)
|
||||
|
||||
print('\n**All playlists have been downloaded**\n')
|
||||
|
@ -73,11 +73,11 @@ def download_track(track_id: str, extra_paths='', prefix=False, prefix_value='',
|
||||
else:
|
||||
try:
|
||||
if not is_playable:
|
||||
print('### SKIPPING:', song_name,
|
||||
print('\n### SKIPPING:', song_name,
|
||||
'(SONG IS UNAVAILABLE) ###')
|
||||
else:
|
||||
if os.path.isfile(filename) and os.path.getsize(filename) and ZSpotify.get_config(SKIP_EXISTING_FILES):
|
||||
print('### SKIPPING:', song_name,
|
||||
print('\n### SKIPPING:', song_name,
|
||||
'(SONG ALREADY EXISTS) ###')
|
||||
else:
|
||||
if track_id != scraped_song_id:
|
||||
@ -107,7 +107,7 @@ def download_track(track_id: str, extra_paths='', prefix=False, prefix_value='',
|
||||
|
||||
if not ZSpotify.get_config(OVERRIDE_AUTO_WAIT):
|
||||
time.sleep(ZSpotify.get_config(ANTI_BAN_WAIT_TIME))
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
print('### SKIPPING:', song_name,
|
||||
'(GENERAL DOWNLOAD ERROR) ###')
|
||||
if os.path.exists(filename):
|
||||
|
@ -17,7 +17,8 @@ from librespot.audio.decoders import VorbisOnlyAudioQuality
|
||||
from librespot.core import Session
|
||||
|
||||
from const import CREDENTIALS_JSON, TYPE, \
|
||||
PREMIUM, USER_READ_EMAIL, AUTHORIZATION, OFFSET, LIMIT, CONFIG_FILE_PATH, FORCE_PREMIUM, RAW_AUDIO_AS_IS
|
||||
PREMIUM, USER_READ_EMAIL, AUTHORIZATION, OFFSET, LIMIT, CONFIG_FILE_PATH, FORCE_PREMIUM, RAW_AUDIO_AS_IS, \
|
||||
PLAYLIST_READ_PRIVATE
|
||||
from utils import MusicFormat
|
||||
|
||||
|
||||
@ -68,13 +69,18 @@ class ZSpotify:
|
||||
def get_content_stream(cls, content_id, quality):
|
||||
return cls.SESSION.content_feeder().load(content_id, VorbisOnlyAudioQuality(quality), False, None)
|
||||
|
||||
@classmethod
|
||||
def __get_auth_token(cls):
|
||||
return cls.SESSION.tokens().get_token(USER_READ_EMAIL, PLAYLIST_READ_PRIVATE).access_token
|
||||
|
||||
@classmethod
|
||||
def get_auth_header(cls):
|
||||
return {AUTHORIZATION: f'Bearer {cls.SESSION.tokens().get(USER_READ_EMAIL)}'}
|
||||
return {
|
||||
AUTHORIZATION: f'Bearer {cls.__get_auth_token()}'}
|
||||
|
||||
@classmethod
|
||||
def get_auth_header_and_params(cls, limit, offset):
|
||||
return {AUTHORIZATION: f'Bearer {cls.SESSION.tokens().get(USER_READ_EMAIL)}'}, {LIMIT: limit, OFFSET: offset}
|
||||
return {AUTHORIZATION: f'Bearer {cls.__get_auth_token()}'}, {LIMIT: limit, OFFSET: offset}
|
||||
|
||||
@classmethod
|
||||
def invoke_url_with_params(cls, url, limit, offset, **kwargs):
|
||||
|
Loading…
x
Reference in New Issue
Block a user