Files
subtitles-sync-whisper/main.py
2026-01-30 19:41:16 +01:00

170 lines
6.7 KiB
Python

import os
import statistics
import sys
from faster_whisper import WhisperModel
from config import SYNC_CONFIG
from core.analysis import Analyzer
from core.matcher import TextMatcher
from core.media import MediaHandler
from core.subtitles import SubtitleHandler
from core.types import SubtitleInfo, WhisperSegment, AnalysisPoint
def parse_bazarr_args(args) -> SubtitleInfo:
arg_dict = {}
for arg in args[1:]:
if '=' in arg:
key, value = arg.split('=', 1)
arg_dict[key] = value
return SubtitleInfo(
episode_path=arg_dict.get('episode', ''),
episode_name=arg_dict.get('episode_name', 'Unknown'),
subtitle_path=arg_dict.get('subtitles', ''),
episode_language=arg_dict.get('episode_language', 'English'),
subtitles_language=arg_dict.get('subtitles_language', 'English')
)
def main():
info = parse_bazarr_args(sys.argv)
print(f"Target: {info.episode_name}")
# 1. Init
audio_stream = MediaHandler.get_audio_stream_index(info.episode_path, info.episode_language)
media_duration = MediaHandler.get_media_duration(info.episode_path)
print(f"Duration: {int(media_duration // 60)}m. Loading Whisper...")
model_name = "base.en" if 'english' in info.episode_language.lower() else "base"
whisper = WhisperModel(model_name, device="cpu", compute_type="int8", cpu_threads=4)
subtitles = SubtitleHandler.parse_srt(info.subtitle_path)
if not subtitles:
print("Error: Subtitle file is empty.")
return
# 2. Scanning Loop
usable_duration = media_duration - 60
step = usable_duration / (SYNC_CONFIG['sample_count'] + 1)
sample_starts = [30 + (i * step) for i in range(SYNC_CONFIG['sample_count'])]
raw_points = []
print(f"\n--- Scanning {len(sample_starts)} Checkpoints ---")
for start_sec in sample_starts:
print(f"Scanning @ {int(start_sec // 60)}m...", end='', flush=True)
audio_file = None
try:
audio_file = MediaHandler.extract_audio_chunk(
info.episode_path, int(start_sec), SYNC_CONFIG['scan_duration_sec'], audio_stream
)
segments, _ = whisper.transcribe(audio_file, vad_filter=True)
w_segments = [WhisperSegment(int(s.start * 1000), int(s.end * 1000), s.text) for s in list(segments)]
matches = TextMatcher.find_matches(subtitles, w_segments, int(start_sec * 1000))
if len(matches) >= SYNC_CONFIG['min_match_count']:
offsets = [w_time - sub.start_ms for sub, w_time, _ in matches]
median_offset = statistics.median(offsets)
avg_sub_time = statistics.mean([sub.start_ms for sub, _, _ in matches])
raw_points.append(AnalysisPoint(avg_sub_time, median_offset, len(matches)))
print(f" Locked: {median_offset:+.0f}ms ({len(matches)} matches)")
else:
print(f" No Lock")
except Exception as e:
print(f" Error: {e}")
finally:
if audio_file and os.path.exists(audio_file):
os.unlink(audio_file)
if not raw_points:
print("FAILED: No sync points found.")
return
# 3. Decision
raw_points.sort(key=lambda x: x.timestamp_ms)
clean_points = Analyzer.filter_outliers(raw_points)
mode = SYNC_CONFIG['correction_method'].upper()
if mode == "AUTO":
mode = Analyzer.decide_sync_strategy(raw_points)
elif mode == "FORCE_ELASTIC":
mode = "ELASTIC"
else:
mode = "CONSTANT"
print(f"\n--- SYNC MODE: {mode} ---")
final_slope = 1.0
final_intercept = 0.0
final_anchors = []
if mode == "CONSTANT":
final_intercept = statistics.median([p.offset_ms for p in clean_points])
print(f"Applying Global Offset: {final_intercept:+.0f} ms")
elif mode == "LINEAR":
final_slope, final_intercept, _ = Analyzer.calculate_weighted_regression(clean_points)
print(f"Applying Linear Correction: Slope={final_slope:.6f}, Base={final_intercept:.0f}ms")
elif mode == "ELASTIC":
anchors = Analyzer.smooth_points(clean_points)
final_anchors = [AnalysisPoint(0, anchors[0].offset_ms, 0)] + anchors + \
[AnalysisPoint(int(media_duration * 1000), anchors[-1].offset_ms, 0)]
print("Applying Non-Linear (Elastic) Map.")
# 4. Apply
count = 0
for sub in subtitles:
new_start, new_end = sub.start_ms, sub.end_ms
if mode == "CONSTANT":
new_start += final_intercept
new_end += final_intercept
elif mode == "LINEAR":
new_start = (sub.start_ms * final_slope) + final_intercept
new_end = (sub.end_ms * final_slope) + final_intercept
elif mode == "ELASTIC":
off = Analyzer.get_interpolated_offset(sub.start_ms, final_anchors)
new_start += off
new_end += off
sub.start_ms = max(0, int(new_start))
sub.end_ms = max(0, int(new_end))
count += 1
SubtitleHandler.write_srt(info.subtitle_path, subtitles)
print(f"Successfully synced {count} lines.")
if __name__ == '__main__':
# sys.argv = [
# 'sync_script.py',
# 'episode=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E01/Superman & Lois - S03E01 - Closer Bluray-1080p.mkv',
# 'episode_name=Superman & Lois - S03E01',
# 'subtitles=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E01/Superman & Lois - S03E01 - Closer Bluray-1080p.en.hi.srt',
# 'episode_language=English',
# 'subtitles_language=English'
# ]
# sys.argv = [
# 'sync_script.py',
# 'episode=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E07/Superman & Lois - S03E07 - Forever And Always Bluray-1080p.mkv',
# 'episode_name=Superman & Lois - S03E07',
# 'subtitles=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E07/Superman & Lois - S03E07 - Forever And Always Bluray-1080p.en.srt',
# 'episode_language=English',
# 'subtitles_language=English'
# ]
sys.argv = [
'sync_script.py',
'episode=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E05/Superman & Lois - S03E05 - Head On Bluray-1080p.mkv',
'episode_name=Superman & Lois - S03E05',
'subtitles=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E05/Superman & Lois - S03E05 - Head On Bluray-1080p.en.srt',
'episode_language=English',
'subtitles_language=English'
]
main()