mirror of
https://github.com/THIS-IS-NOT-A-BACKUP/zspotify.git
synced 2025-07-05 08:54:45 +00:00
Merge remote-tracking branch 'upstream/main'
This commit is contained in:
commit
ae338b9a86
@ -27,7 +27,8 @@ def get_show_episodes(show_id_str) -> list:
|
|||||||
limit = 50
|
limit = 50
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
resp = ZSpotify.invoke_url_with_params(f'{SHOWS_URL}/{show_id_str}/episodes', limit=limit, offset=offset)
|
resp = ZSpotify.invoke_url_with_params(
|
||||||
|
f'{SHOWS_URL}/{show_id_str}/episodes', limit=limit, offset=offset)
|
||||||
offset += limit
|
offset += limit
|
||||||
for episode in resp[ITEMS]:
|
for episode in resp[ITEMS]:
|
||||||
episodes.append(episode[ID])
|
episodes.append(episode[ID])
|
||||||
@ -37,6 +38,33 @@ def get_show_episodes(show_id_str) -> list:
|
|||||||
return episodes
|
return episodes
|
||||||
|
|
||||||
|
|
||||||
|
def download_podcast_directly(url, filename):
|
||||||
|
import functools
|
||||||
|
import pathlib
|
||||||
|
import shutil
|
||||||
|
import requests
|
||||||
|
from tqdm.auto import tqdm
|
||||||
|
|
||||||
|
r = requests.get(url, stream=True, allow_redirects=True)
|
||||||
|
if r.status_code != 200:
|
||||||
|
r.raise_for_status() # Will only raise for 4xx codes, so...
|
||||||
|
raise RuntimeError(
|
||||||
|
f"Request to {url} returned status code {r.status_code}")
|
||||||
|
file_size = int(r.headers.get('Content-Length', 0))
|
||||||
|
|
||||||
|
path = pathlib.Path(filename).expanduser().resolve()
|
||||||
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
desc = "(Unknown total file size)" if file_size == 0 else ""
|
||||||
|
r.raw.read = functools.partial(
|
||||||
|
r.raw.read, decode_content=True) # Decompress if needed
|
||||||
|
with tqdm.wrapattr(r.raw, "read", total=file_size, desc=desc) as r_raw:
|
||||||
|
with path.open("wb") as f:
|
||||||
|
shutil.copyfileobj(r_raw, f)
|
||||||
|
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
def download_episode(episode_id) -> None:
|
def download_episode(episode_id) -> None:
|
||||||
podcast_name, episode_name = get_episode_info(episode_id)
|
podcast_name, episode_name = get_episode_info(episode_id)
|
||||||
|
|
||||||
@ -47,8 +75,8 @@ def download_episode(episode_id) -> None:
|
|||||||
else:
|
else:
|
||||||
filename = podcast_name + ' - ' + episode_name
|
filename = podcast_name + ' - ' + episode_name
|
||||||
|
|
||||||
episode_id = EpisodeId.from_base62(episode_id)
|
direct_download_url = ZSpotify.invoke_url(
|
||||||
stream = ZSpotify.get_content_stream(episode_id, ZSpotify.DOWNLOAD_QUALITY)
|
'https://api-partner.spotify.com/pathfinder/v1/query?operationName=getEpisode&variables={"uri":"spotify:episode:' + episode_id + '"}&extensions={"persistedQuery":{"version":1,"sha256Hash":"224ba0fd89fcfdfb3a15fa2d82a6112d3f4e2ac88fba5c6713de04d1b72cf482"}}')["data"]["episode"]["audio"]["items"][-1]["url"]
|
||||||
|
|
||||||
download_directory = os.path.join(
|
download_directory = os.path.join(
|
||||||
os.path.dirname(__file__),
|
os.path.dirname(__file__),
|
||||||
@ -58,33 +86,41 @@ def download_episode(episode_id) -> None:
|
|||||||
download_directory = os.path.realpath(download_directory)
|
download_directory = os.path.realpath(download_directory)
|
||||||
create_download_directory(download_directory)
|
create_download_directory(download_directory)
|
||||||
|
|
||||||
total_size = stream.input_stream.size
|
if "anon-podcast.scdn.co" in direct_download_url:
|
||||||
|
episode_id = EpisodeId.from_base62(episode_id)
|
||||||
|
stream = ZSpotify.get_content_stream(
|
||||||
|
episode_id, ZSpotify.DOWNLOAD_QUALITY)
|
||||||
|
|
||||||
filepath = os.path.join(download_directory, f"{filename}.ogg")
|
total_size = stream.input_stream.size
|
||||||
if (
|
|
||||||
os.path.isfile(filepath)
|
|
||||||
and os.path.getsize(filepath) == total_size
|
|
||||||
and ZSpotify.get_config(SKIP_EXISTING_FILES)
|
|
||||||
):
|
|
||||||
print(
|
|
||||||
"\n### SKIPPING:",
|
|
||||||
podcast_name,
|
|
||||||
"-",
|
|
||||||
episode_name,
|
|
||||||
"(EPISODE ALREADY EXISTS) ###",
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
with open(filepath, 'wb') as file, tqdm(
|
filepath = os.path.join(download_directory, f"{filename}.ogg")
|
||||||
desc=filename,
|
if (
|
||||||
total=total_size,
|
os.path.isfile(filepath)
|
||||||
unit='B',
|
and os.path.getsize(filepath) == total_size
|
||||||
unit_scale=True,
|
and ZSpotify.get_config(SKIP_EXISTING_FILES)
|
||||||
unit_divisor=1024
|
):
|
||||||
) as bar:
|
print(
|
||||||
for _ in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1):
|
"\n### SKIPPING:",
|
||||||
bar.update(file.write(
|
podcast_name,
|
||||||
stream.input_stream.stream().read(ZSpotify.get_config(CHUNK_SIZE))))
|
"-",
|
||||||
|
episode_name,
|
||||||
|
"(EPISODE ALREADY EXISTS) ###",
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
with open(filepath, 'wb') as file, tqdm(
|
||||||
|
desc=filename,
|
||||||
|
total=total_size,
|
||||||
|
unit='B',
|
||||||
|
unit_scale=True,
|
||||||
|
unit_divisor=1024
|
||||||
|
) as bar:
|
||||||
|
for _ in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1):
|
||||||
|
bar.update(file.write(
|
||||||
|
stream.input_stream.stream().read(ZSpotify.get_config(CHUNK_SIZE))))
|
||||||
|
else:
|
||||||
|
filepath = os.path.join(download_directory, f"{filename}.mp3")
|
||||||
|
download_podcast_directly(direct_download_url, filepath)
|
||||||
|
|
||||||
# convert_audio_format(ROOT_PODCAST_PATH +
|
# convert_audio_format(ROOT_PODCAST_PATH +
|
||||||
# extra_paths + filename + '.ogg')
|
# extra_paths + filename + '.ogg')
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
|
import re
|
||||||
import time
|
import time
|
||||||
from typing import Any, Tuple, List
|
from typing import Any, Tuple, List
|
||||||
|
|
||||||
@ -11,7 +12,8 @@ from tqdm import tqdm
|
|||||||
from const import TRACKS, ALBUM, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
|
from const import TRACKS, ALBUM, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
|
||||||
RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, SPLIT_ALBUM_DISCS, ROOT_PATH, DOWNLOAD_FORMAT, CHUNK_SIZE, \
|
RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, SPLIT_ALBUM_DISCS, ROOT_PATH, DOWNLOAD_FORMAT, CHUNK_SIZE, \
|
||||||
SKIP_EXISTING_FILES, ANTI_BAN_WAIT_TIME, OVERRIDE_AUTO_WAIT, BITRATE, CODEC_MAP, EXT_MAP, DOWNLOAD_REAL_TIME
|
SKIP_EXISTING_FILES, ANTI_BAN_WAIT_TIME, OVERRIDE_AUTO_WAIT, BITRATE, CODEC_MAP, EXT_MAP, DOWNLOAD_REAL_TIME
|
||||||
from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory
|
from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \
|
||||||
|
get_directory_song_ids, add_to_directory_song_ids
|
||||||
from zspotify import ZSpotify
|
from zspotify import ZSpotify
|
||||||
|
|
||||||
|
|
||||||
@ -74,6 +76,18 @@ def download_track(track_id: str, extra_paths='', prefix=False, prefix_value='',
|
|||||||
filename = os.path.join(
|
filename = os.path.join(
|
||||||
download_directory, f'{song_name}.{EXT_MAP.get(ZSpotify.get_config(DOWNLOAD_FORMAT).lower())}')
|
download_directory, f'{song_name}.{EXT_MAP.get(ZSpotify.get_config(DOWNLOAD_FORMAT).lower())}')
|
||||||
|
|
||||||
|
check_name = os.path.isfile(filename) and os.path.getsize(filename)
|
||||||
|
check_id = scraped_song_id in get_directory_song_ids(download_directory)
|
||||||
|
|
||||||
|
# a song with the same name is installed
|
||||||
|
if not check_id and check_name:
|
||||||
|
c = len([file for file in os.listdir(download_directory)
|
||||||
|
if re.search(f'^{song_name}_', file)]) + 1
|
||||||
|
|
||||||
|
filename = os.path.join(
|
||||||
|
download_directory, f'{song_name}_{c}.{EXT_MAP.get(ZSpotify.get_config(DOWNLOAD_FORMAT))}')
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print('### SKIPPING SONG - FAILED TO QUERY METADATA ###')
|
print('### SKIPPING SONG - FAILED TO QUERY METADATA ###')
|
||||||
print(e)
|
print(e)
|
||||||
@ -83,7 +97,7 @@ def download_track(track_id: str, extra_paths='', prefix=False, prefix_value='',
|
|||||||
print('\n### SKIPPING:', song_name,
|
print('\n### SKIPPING:', song_name,
|
||||||
'(SONG IS UNAVAILABLE) ###')
|
'(SONG IS UNAVAILABLE) ###')
|
||||||
else:
|
else:
|
||||||
if os.path.isfile(filename) and os.path.getsize(filename) and ZSpotify.get_config(SKIP_EXISTING_FILES):
|
if check_id and check_name and ZSpotify.get_config(SKIP_EXISTING_FILES):
|
||||||
print('\n### SKIPPING:', song_name,
|
print('\n### SKIPPING:', song_name,
|
||||||
'(SONG ALREADY EXISTS) ###')
|
'(SONG ALREADY EXISTS) ###')
|
||||||
else:
|
else:
|
||||||
@ -119,6 +133,10 @@ def download_track(track_id: str, extra_paths='', prefix=False, prefix_value='',
|
|||||||
release_year, disc_number, track_number)
|
release_year, disc_number, track_number)
|
||||||
set_music_thumbnail(filename, image_url)
|
set_music_thumbnail(filename, image_url)
|
||||||
|
|
||||||
|
# add song id to download directory's .song_ids file
|
||||||
|
if not check_id:
|
||||||
|
add_to_directory_song_ids(download_directory, scraped_song_id)
|
||||||
|
|
||||||
if not ZSpotify.get_config(OVERRIDE_AUTO_WAIT):
|
if not ZSpotify.get_config(OVERRIDE_AUTO_WAIT):
|
||||||
time.sleep(ZSpotify.get_config(ANTI_BAN_WAIT_TIME))
|
time.sleep(ZSpotify.get_config(ANTI_BAN_WAIT_TIME))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -15,11 +15,38 @@ from const import ARTIST, TRACKTITLE, ALBUM, YEAR, DISCNUMBER, TRACKNUMBER, ARTW
|
|||||||
class MusicFormat(str, Enum):
|
class MusicFormat(str, Enum):
|
||||||
MP3 = 'mp3',
|
MP3 = 'mp3',
|
||||||
OGG = 'ogg',
|
OGG = 'ogg',
|
||||||
|
|
||||||
|
|
||||||
def create_download_directory(download_path: str) -> None:
|
def create_download_directory(download_path: str) -> None:
|
||||||
|
""" Create directory and add a hidden file with song ids """
|
||||||
os.makedirs(download_path, exist_ok=True)
|
os.makedirs(download_path, exist_ok=True)
|
||||||
|
|
||||||
|
# add hidden file with song ids
|
||||||
|
hidden_file_path = os.path.join(download_path, '.song_ids')
|
||||||
|
if not os.path.isfile(hidden_file_path):
|
||||||
|
with open(hidden_file_path, 'w', encoding='utf-8') as f:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_directory_song_ids(download_path: str) -> List[str]:
|
||||||
|
""" Gets song ids of songs in directory """
|
||||||
|
|
||||||
|
song_ids = []
|
||||||
|
|
||||||
|
hidden_file_path = os.path.join(download_path, '.song_ids')
|
||||||
|
if os.path.isfile(hidden_file_path):
|
||||||
|
with open(hidden_file_path, 'r', encoding='utf-8') as file:
|
||||||
|
song_ids.extend([line.strip() for line in file.readlines()])
|
||||||
|
|
||||||
|
return song_ids
|
||||||
|
|
||||||
|
def add_to_directory_song_ids(download_path: str, song_id: str) -> None:
|
||||||
|
""" Appends song_id to .song_ids file in directory """
|
||||||
|
|
||||||
|
hidden_file_path = os.path.join(download_path, '.song_ids')
|
||||||
|
# not checking if file exists because we need an exception
|
||||||
|
# to be raised if something is wrong
|
||||||
|
with open(hidden_file_path, 'a', encoding='utf-8') as file:
|
||||||
|
file.write(f'{song_id}\n')
|
||||||
|
|
||||||
def wait(seconds: int = 3) -> None:
|
def wait(seconds: int = 3) -> None:
|
||||||
""" Pause for a set number of seconds """
|
""" Pause for a set number of seconds """
|
||||||
|
Loading…
x
Reference in New Issue
Block a user