diff --git a/config.py b/config.py index c214754..6138594 100644 --- a/config.py +++ b/config.py @@ -23,4 +23,6 @@ SYNC_CONFIG = { "jitter_tolerance_ms": 300, "min_drift_slope": 0.00005, "linear_r2_threshold": 0.80, -} \ No newline at end of file + + "create_backup": True, +} diff --git a/core/subtitles.py b/core/subtitles.py index 3c38571..211fad7 100644 --- a/core/subtitles.py +++ b/core/subtitles.py @@ -1,7 +1,9 @@ import os import re +import shutil from typing import List +from config import SYNC_CONFIG from .types import SubtitleEntry @@ -41,9 +43,28 @@ class SubtitleHandler: entries.append(SubtitleEntry(int(match.group(1)), start, end, match.group(4).strip())) return entries + @staticmethod + def create_backup(filepath: str): + """Creates a .bak copy of the subtitles if one doesn't exist.""" + backup_path = filepath + ".bak" + if not os.path.exists(backup_path): + try: + shutil.copy2(filepath, backup_path) + print(f"Backup created: {os.path.basename(backup_path)}") + except IOError as e: + print(f"Warning: Could not create backup: {e}") + @staticmethod def write_srt(filepath: str, entries: List[SubtitleEntry]): - with open(filepath, 'w', encoding='utf-8') as f: - for entry in entries: - f.write( - f"{entry.index}\n{SubtitleHandler.format_time(entry.start_ms)} --> {SubtitleHandler.format_time(entry.end_ms)}\n{entry.raw_text}\n\n") + # 1. Ensure backup exists before overwriting + if SYNC_CONFIG['create_backup']: + SubtitleHandler.create_backup(filepath) + + # 2. Overwrite + try: + with open(filepath, 'w', encoding='utf-8') as f: + for entry in entries: + f.write( + f"{entry.index}\n{SubtitleHandler.format_time(entry.start_ms)} --> {SubtitleHandler.format_time(entry.end_ms)}\n{entry.raw_text}\n\n") + except IOError as e: + print(f"Error writing subtitle file: {e}") diff --git a/core/utils.py b/core/utils.py new file mode 100644 index 0000000..3077fb9 --- /dev/null +++ b/core/utils.py @@ -0,0 +1,18 @@ +from .types import SubtitleInfo + + +def parse_bazarr_args(args: list) -> SubtitleInfo: + """Parses key=value arguments passed by Bazarr.""" + arg_dict = {} + for arg in args[1:]: + if '=' in arg: + key, value = arg.split('=', 1) + arg_dict[key] = value.strip('"').strip("'") # Clean quotes if present + + return SubtitleInfo( + episode_path=arg_dict.get('episode', ''), + episode_name=arg_dict.get('episode_name', 'Unknown'), + subtitle_path=arg_dict.get('subtitles', ''), + episode_language=arg_dict.get('episode_language', 'English'), + subtitles_language=arg_dict.get('subtitles_language', 'English') + ) diff --git a/main.py b/main.py index d1cae98..dacb661 100644 --- a/main.py +++ b/main.py @@ -9,40 +9,28 @@ from core.analysis import Analyzer from core.matcher import TextMatcher from core.media import MediaHandler from core.subtitles import SubtitleHandler -from core.types import SubtitleInfo, WhisperSegment, AnalysisPoint - - -def parse_bazarr_args(args) -> SubtitleInfo: - arg_dict = {} - for arg in args[1:]: - if '=' in arg: - key, value = arg.split('=', 1) - arg_dict[key] = value - return SubtitleInfo( - episode_path=arg_dict.get('episode', ''), - episode_name=arg_dict.get('episode_name', 'Unknown'), - subtitle_path=arg_dict.get('subtitles', ''), - episode_language=arg_dict.get('episode_language', 'English'), - subtitles_language=arg_dict.get('subtitles_language', 'English') - ) +from core.types import WhisperSegment, AnalysisPoint +from core.utils import parse_bazarr_args def main(): + # 1. Parse Arguments info = parse_bazarr_args(sys.argv) + if not info.subtitle_path or not os.path.exists(info.subtitle_path): + print("Error: Invalid or missing subtitle path.") + return + print(f"Target: {info.episode_name}") - # 1. Init + # 2. Initialize Resources audio_stream = MediaHandler.get_audio_stream_index(info.episode_path, info.episode_language) media_duration = MediaHandler.get_media_duration(info.episode_path) - - # Get the 2-letter code (e.g., "en") - whisper_lang_code = MediaHandler.get_language_code(info.episode_language) + whisper_lang = MediaHandler.get_language_code(info.episode_language) print(f"Duration: {int(media_duration // 60)}m. Loading Whisper ({SYNC_CONFIG['device']})...") - # Load model based on Config - model_name = "base.en" if whisper_lang_code == 'en' else "base" - + # Load Model + model_name = "base.en" if whisper_lang == 'en' else "base" try: whisper = WhisperModel( model_name, @@ -51,15 +39,12 @@ def main(): cpu_threads=4 ) except Exception as e: - print(f"Error loading model: {e}") + print(f"CRITICAL: Failed to load Whisper model: {e}") return subtitles = SubtitleHandler.parse_srt(info.subtitle_path) - if not subtitles: - print("Error: Subtitle file is empty.") - return - # 2. Scanning Loop + # 3. Scanning Loop usable_duration = media_duration - 60 step = usable_duration / (SYNC_CONFIG['sample_count'] + 1) sample_starts = [30 + (i * step) for i in range(SYNC_CONFIG['sample_count'])] @@ -75,14 +60,14 @@ def main(): info.episode_path, int(start_sec), SYNC_CONFIG['scan_duration_sec'], audio_stream ) + # Optimized Transcribe segments, _ = whisper.transcribe( audio_file, vad_filter=SYNC_CONFIG['vad_filter'], vad_parameters=dict(min_silence_duration_ms=SYNC_CONFIG['vad_min_silence']), - language=whisper_lang_code, + language=whisper_lang, beam_size=SYNC_CONFIG['beam_size'], - condition_on_previous_text=False, - word_timestamps=False + condition_on_previous_text=False ) w_segments = [WhisperSegment(int(s.start * 1000), int(s.end * 1000), s.text) for s in list(segments)] @@ -103,10 +88,10 @@ def main(): os.unlink(audio_file) if not raw_points: - print("FAILED: No sync points found.") + print("FAILED: No sync points found. Exiting.") return - # 3. Decision + # 4. Analysis & Decision raw_points.sort(key=lambda x: x.timestamp_ms) clean_points = Analyzer.filter_outliers(raw_points) @@ -128,17 +113,22 @@ def main(): final_intercept = statistics.median([p.offset_ms for p in clean_points]) print(f"Applying Global Offset: {final_intercept:+.0f} ms") + if abs(final_intercept) < 50: + print("Offset is negligible. No changes needed.") + return + elif mode == "LINEAR": final_slope, final_intercept, _ = Analyzer.calculate_weighted_regression(clean_points) print(f"Applying Linear Correction: Slope={final_slope:.6f}, Base={final_intercept:.0f}ms") elif mode == "ELASTIC": anchors = Analyzer.smooth_points(clean_points) + # Extend anchors to cover 0 to End final_anchors = [AnalysisPoint(0, anchors[0].offset_ms, 0)] + anchors + \ [AnalysisPoint(int(media_duration * 1000), anchors[-1].offset_ms, 0)] print("Applying Non-Linear (Elastic) Map.") - # 4. Apply + # 5. Application count = 0 for sub in subtitles: new_start, new_end = sub.start_ms, sub.end_ms @@ -190,7 +180,6 @@ if __name__ == '__main__': 'subtitles_language=English' ] - # sys.argv = [ # 'sync_script.py', # 'episode=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E05/Superman & Lois - S03E05 - Head On Bluray-1080p.mkv', diff --git a/requirements.txt b/requirements.txt index 591bf8d..c10414e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -git+https://github.com/absadiki/pywhispercpp \ No newline at end of file +faster-whisper \ No newline at end of file