mirror of
https://github.com/THIS-IS-NOT-A-BACKUP/zspotify.git
synced 2024-11-27 18:33:17 +01:00
172 lines
7.7 KiB
Python
172 lines
7.7 KiB
Python
import math
|
|
import os
|
|
import time
|
|
from typing import Any
|
|
|
|
from librespot.audio.decoders import AudioQuality
|
|
from librespot.metadata import TrackId
|
|
from pydub import AudioSegment
|
|
from tqdm import tqdm
|
|
|
|
from const import TRACKS, ALBUM, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
|
|
RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, SPLIT_ALBUM_DISCS, ROOT_PATH, DOWNLOAD_FORMAT, CHUNK_SIZE, \
|
|
SKIP_EXISTING_FILES, ANTI_BAN_WAIT_TIME, OVERRIDE_AUTO_WAIT, DURATION_MS
|
|
from utils import sanitize_data, set_audio_tags, set_music_thumbnail, create_download_directory, \
|
|
MusicFormat
|
|
from zspotify import ZSpotify
|
|
|
|
|
|
def get_saved_tracks() -> list:
|
|
""" Returns user's saved tracks """
|
|
songs = []
|
|
offset = 0
|
|
limit = 50
|
|
|
|
while True:
|
|
resp = ZSpotify.invoke_url_with_params(SAVED_TRACKS_URL, limit=limit, offset=offset)
|
|
offset += limit
|
|
songs.extend(resp[ITEMS])
|
|
if len(resp[ITEMS]) < limit:
|
|
break
|
|
|
|
return songs
|
|
|
|
|
|
def get_song_info(song_id) -> tuple[list[str], str, str, Any, Any, Any, Any, Any, Any, Any]:
|
|
""" Retrieves metadata for downloaded songs """
|
|
info = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token')
|
|
|
|
artists = []
|
|
for data in info[TRACKS][0][ARTISTS]:
|
|
artists.append(data[NAME])
|
|
album_name = info[TRACKS][0][ALBUM][NAME]
|
|
name = info[TRACKS][0][NAME]
|
|
image_url = info[TRACKS][0][ALBUM][IMAGES][0][URL]
|
|
release_year = info[TRACKS][0][ALBUM][RELEASE_DATE].split('-')[0]
|
|
disc_number = info[TRACKS][0][DISC_NUMBER]
|
|
track_number = info[TRACKS][0][TRACK_NUMBER]
|
|
scraped_song_id = info[TRACKS][0][ID]
|
|
is_playable = info[TRACKS][0][IS_PLAYABLE]
|
|
duration = math.ceil(info[TRACKS][0][DURATION_MS] / 1000)
|
|
|
|
return (artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable,
|
|
duration)
|
|
|
|
|
|
# noinspection PyBroadException
|
|
def download_track(track_id: str, extra_paths: str = '', prefix: bool = False, prefix_value='',
|
|
disable_progressbar: bool = False,
|
|
create_m3u_file: bool = False) -> None:
|
|
""" Downloads raw song audio from Spotify """
|
|
try:
|
|
(artists, album_name, name, image_url, release_year, disc_number,
|
|
track_number, scraped_song_id, is_playable, duration) = get_song_info(track_id)
|
|
song_name, filename, m3u_filename = process_track_metadata(artists, name, disc_number, extra_paths, prefix,
|
|
prefix_value, create_m3u_file)
|
|
except Exception:
|
|
print('### SKIPPING SONG - FAILED TO QUERY METADATA ###')
|
|
else:
|
|
try:
|
|
track_info = (
|
|
artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id,
|
|
is_playable, duration)
|
|
playlist_info = (create_m3u_file, m3u_filename)
|
|
creat_track(track_id, extra_paths, song_name, filename, disable_progressbar, track_info, playlist_info)
|
|
except Exception:
|
|
print('### SKIPPING:', song_name,
|
|
'(GENERAL DOWNLOAD ERROR) ###')
|
|
if os.path.exists(filename):
|
|
os.remove(filename)
|
|
|
|
|
|
def process_track_metadata(artists: list, name: str, disc_number: Any, extra_paths: str, prefix: bool,
|
|
prefix_value: str,
|
|
create_m3u_file: bool):
|
|
m3u_filename = None
|
|
|
|
song_name = sanitize_data(artists[0]) + ' - ' + sanitize_data(name)
|
|
if prefix:
|
|
song_name = f'{prefix_value.zfill(2)} - {song_name}' if prefix_value.isdigit(
|
|
) else f'{prefix_value} - {song_name}'
|
|
if create_m3u_file:
|
|
m3u_filename = f'{os.path.join(ZSpotify.get_config(ROOT_PATH), extra_paths, extra_paths)[:-1]}.m3u'
|
|
if ZSpotify.get_config(SPLIT_ALBUM_DISCS):
|
|
filename = os.path.join(ZSpotify.get_config(ROOT_PATH), extra_paths, 'Disc ' + str(
|
|
disc_number) + '/' + song_name + '.' + ZSpotify.get_config(DOWNLOAD_FORMAT))
|
|
else:
|
|
filename = os.path.join(ZSpotify.get_config(ROOT_PATH), extra_paths,
|
|
song_name + '.' + ZSpotify.get_config(DOWNLOAD_FORMAT))
|
|
return song_name, filename, m3u_filename
|
|
|
|
|
|
def creat_track(track_id, extra_paths: str, song_name: str, filename: str, disable_progressbar: bool, track_info: tuple,
|
|
playlist_info: tuple):
|
|
(artists, album_name, name, image_url, release_year, disc_number,
|
|
track_number, scraped_song_id, is_playable, duration) = track_info
|
|
create_m3u, m3u_filename = playlist_info
|
|
if not is_playable:
|
|
print(f'\n### SKIPPING: {song_name} (SONG IS UNAVAILABLE) ###')
|
|
else:
|
|
if os.path.isfile(filename) and os.path.getsize(filename) and ZSpotify.get_config(SKIP_EXISTING_FILES):
|
|
playlist_data = f'#EXTINF:{duration}, {artists[0]} - {name}\n{os.path.abspath(filename)}'
|
|
create_playlist_file(create_m3u, playlist_data, m3u_filename)
|
|
print(f'\n### SKIPPING: {song_name} (SONG ALREADY EXISTS) ###')
|
|
else:
|
|
if track_id != scraped_song_id:
|
|
track_id = scraped_song_id
|
|
track_id = TrackId.from_base62(track_id)
|
|
stream = ZSpotify.get_content_stream(track_id, ZSpotify.DOWNLOAD_QUALITY)
|
|
create_download_directory(ZSpotify.get_config(ROOT_PATH) + extra_paths)
|
|
total_size = stream.input_stream.size
|
|
|
|
write_stream_to_file(stream, filename, song_name, total_size, disable_progressbar, track_info,
|
|
playlist_info)
|
|
|
|
|
|
def write_stream_to_file(stream, filename: str, song_name: str, total_size: Any, disable_progressbar: bool,
|
|
track_info: tuple, playlist_info: tuple):
|
|
(artists, album_name, name, image_url, release_year, disc_number,
|
|
track_number, scraped_song_id, is_playable, duration) = track_info
|
|
create_m3u, m3u_filename = playlist_info
|
|
with open(filename, 'wb') as file, tqdm(
|
|
desc=song_name,
|
|
total=total_size,
|
|
unit='B',
|
|
unit_scale=True,
|
|
unit_divisor=1024,
|
|
disable=disable_progressbar
|
|
) as p_bar:
|
|
for _ in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1):
|
|
p_bar.update(file.write(
|
|
stream.input_stream.stream().read(ZSpotify.get_config(CHUNK_SIZE))))
|
|
playlist_data = f'#EXTINF:{duration}, {artists[0]} - {name}\n{os.path.abspath(filename)}'
|
|
if ZSpotify.get_config(DOWNLOAD_FORMAT) == MusicFormat.MP3.value:
|
|
convert_audio_format(filename)
|
|
set_audio_tags(filename, artists, name, album_name,
|
|
release_year, disc_number, track_number)
|
|
set_music_thumbnail(filename, image_url)
|
|
|
|
if not ZSpotify.get_config(OVERRIDE_AUTO_WAIT):
|
|
time.sleep(ZSpotify.get_config(ANTI_BAN_WAIT_TIME))
|
|
create_playlist_file(create_m3u, playlist_data, m3u_filename)
|
|
|
|
|
|
def create_playlist_file(create_m3u: bool, playlist_data: str, m3u_filename: str):
|
|
if create_m3u and m3u_filename and playlist_data:
|
|
with open(m3u_filename, 'a+') as pfile:
|
|
if os.path.getsize(m3u_filename) == 0:
|
|
pfile.write('#EXTM3U\n')
|
|
pfile.write(playlist_data + '\n')
|
|
|
|
|
|
def convert_audio_format(filename) -> None:
|
|
""" Converts raw audio into playable mp3 """
|
|
# print('### CONVERTING TO ' + MUSIC_FORMAT.upper() + ' ###')
|
|
raw_audio = AudioSegment.from_file(filename, format=MusicFormat.OGG.value,
|
|
frame_rate=44100, channels=2, sample_width=2)
|
|
if ZSpotify.DOWNLOAD_QUALITY == AudioQuality.VERY_HIGH:
|
|
bitrate = '320k'
|
|
else:
|
|
bitrate = '160k'
|
|
raw_audio.export(filename, format=ZSpotify.get_config(DOWNLOAD_FORMAT), bitrate=bitrate)
|