commit 76581db30b7c15fa9497b98863f9c8b09f68094d Author: Mathieu Broillet Date: Fri Jan 30 19:41:16 2026 +0100 feat: init diff --git a/config.py b/config.py new file mode 100644 index 0000000..b529d49 --- /dev/null +++ b/config.py @@ -0,0 +1,19 @@ +# config.py + +SYNC_CONFIG = { + # Sampling + "sample_count": 20, + "scan_duration_sec": 45, + + # Matching + "min_match_count": 3, + "min_match_score": 0.70, + "search_window_sec": 30, + + # Logic & Decision Thresholds + "fix_drift": True, + "correction_method": "auto", # Options: "auto", "constant", "force_elastic" + "jitter_tolerance_ms": 300, + "min_drift_slope": 0.00005, + "linear_r2_threshold": 0.80, +} diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/analysis.py b/core/analysis.py new file mode 100644 index 0000000..fea8ad1 --- /dev/null +++ b/core/analysis.py @@ -0,0 +1,104 @@ +import statistics +from typing import List, Tuple + +from config import SYNC_CONFIG +from .types import AnalysisPoint + + +class Analyzer: + @staticmethod + def filter_outliers(points: List[AnalysisPoint]) -> List[AnalysisPoint]: + """IQR Filter to remove bad matches.""" + if len(points) < 4: return points + offsets = sorted([p.offset_ms for p in points]) + q1 = offsets[len(offsets) // 4] + q3 = offsets[3 * len(offsets) // 4] + iqr = q3 - q1 + lower, upper = q1 - (1.5 * iqr), q3 + (1.5 * iqr) + return [p for p in points if lower <= p.offset_ms <= upper] + + @staticmethod + def calculate_weighted_regression(points: List[AnalysisPoint]) -> Tuple[float, float, float]: + """Returns (Slope, Intercept, R2) weighted by match confidence.""" + n = len(points) + if n < 2: return 1.0, 0.0, 0.0 + + x = [p.timestamp_ms for p in points] + y = [p.timestamp_ms + p.offset_ms for p in points] + w = [p.match_count for p in points] + + sum_w = sum(w) + sum_wx = sum(wi * xi for wi, xi in zip(w, x)) + sum_wy = sum(wi * yi for wi, yi in zip(w, y)) + sum_wxx = sum(wi * xi * xi for wi, xi in zip(w, x)) + sum_wxy = sum(wi * xi * yi for wi, xi, yi in zip(w, x, y)) + + denom = sum_w * sum_wxx - sum_wx * sum_wx + if denom == 0: return 1.0, 0.0, 0.0 + + slope = (sum_w * sum_wxy - sum_wx * sum_wy) / denom + intercept = (sum_wy - slope * sum_wx) / sum_w + + # Unweighted R2 + y_mean = sum(y) / n + ss_tot = sum((yi - y_mean) ** 2 for yi in y) + ss_res = sum((yi - (slope * xi + intercept)) ** 2 for xi, yi in zip(x, y)) + r2 = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0 + + return slope, intercept, r2 + + @staticmethod + def smooth_points(points: List[AnalysisPoint]) -> List[AnalysisPoint]: + """Weighted smoothing for Elastic mode.""" + if len(points) < 3: return points + points.sort(key=lambda p: p.timestamp_ms) + smoothed = [points[0]] + for i in range(1, len(points) - 1): + prev, curr, next_p = points[i - 1], points[i], points[i + 1] + avg_offset = (prev.offset_ms * 0.25) + (curr.offset_ms * 0.5) + (next_p.offset_ms * 0.25) + smoothed.append(AnalysisPoint(curr.timestamp_ms, avg_offset, curr.match_count)) + smoothed.append(points[-1]) + return smoothed + + @staticmethod + def get_interpolated_offset(target_ms: int, anchors: List[AnalysisPoint]) -> float: + if target_ms <= anchors[0].timestamp_ms: return anchors[0].offset_ms + if target_ms >= anchors[-1].timestamp_ms: return anchors[-1].offset_ms + + for i in range(len(anchors) - 1): + p1, p2 = anchors[i], anchors[i + 1] + if p1.timestamp_ms <= target_ms < p2.timestamp_ms: + alpha = (target_ms - p1.timestamp_ms) / (p2.timestamp_ms - p1.timestamp_ms) + return p1.offset_ms + (alpha * (p2.offset_ms - p1.offset_ms)) + return anchors[0].offset_ms + + @staticmethod + def decide_sync_strategy(points: List[AnalysisPoint]) -> str: + clean_points = Analyzer.filter_outliers(points) + if len(clean_points) < 2: return 'CONSTANT' + + offsets = [p.offset_ms for p in clean_points] + std_dev = statistics.stdev(offsets) if len(offsets) > 1 else 0 + + print(f"\nAnalysis Metrics (Cleaned Data):") + print(f" Spread: {max(offsets) - min(offsets)}ms") + print(f" StdDev: {std_dev:.2f}ms") + + if std_dev < SYNC_CONFIG['jitter_tolerance_ms']: + print(" Decision: Offsets are stable (Low Jitter).") + return 'CONSTANT' + + if not SYNC_CONFIG['fix_drift']: + print(" Decision: Drift detected but 'fix_drift' is False.") + return 'CONSTANT' + + slope, _, r2 = Analyzer.calculate_weighted_regression(clean_points) + drift_per_hour = abs(slope - 1.0) * 3600000 + print(f" Linear Fit: R2={r2:.4f}, Slope={slope:.6f} (Drift: {drift_per_hour:.0f}ms/hr)") + + if r2 >= SYNC_CONFIG['linear_r2_threshold'] and drift_per_hour > 100: + print(" Decision: Linear drift detected.") + return 'LINEAR' + + print(" Decision: Variable/irregular drift.") + return 'ELASTIC' diff --git a/core/matcher.py b/core/matcher.py new file mode 100644 index 0000000..1ff4c5b --- /dev/null +++ b/core/matcher.py @@ -0,0 +1,49 @@ +import re +from difflib import SequenceMatcher +from typing import List, Tuple + +from config import SYNC_CONFIG +from .types import SubtitleEntry, WhisperSegment + + +class TextMatcher: + @staticmethod + def normalize_text(text: str) -> str: + return re.sub(r'\s+', ' ', re.sub(r'[^\w\s]', '', text).lower().replace('\n', ' ')).strip() + + @staticmethod + def text_similarity(text1: str, text2: str) -> float: + n1, n2 = TextMatcher.normalize_text(text1), TextMatcher.normalize_text(text2) + if not n1 or not n2: return 0.0 + return SequenceMatcher(None, n1, n2).ratio() + + @staticmethod + def find_matches(subtitles: List[SubtitleEntry], whisper_segments: List[WhisperSegment], chunk_start_ms: int) -> \ + List[Tuple[SubtitleEntry, int, float]]: + matches = [] + window = SYNC_CONFIG['search_window_sec'] * 1000 + scan_dur = SYNC_CONFIG['scan_duration_sec'] * 1000 + + # Optimization: Pre-filter subtitles + relevant_subs = [ + s for s in subtitles + if (chunk_start_ms - window) <= s.start_ms <= (chunk_start_ms + scan_dur + window) + ] + + for w_seg in whisper_segments: + abs_start = w_seg.start_ms + chunk_start_ms + best_sub = None + best_score = 0.0 + + for sub in relevant_subs: + if not (abs_start - window <= sub.start_ms <= abs_start + window): continue + if len(sub.raw_text) < 3: continue + + score = TextMatcher.text_similarity(sub.raw_text, w_seg.text) + if score > best_score: + best_score = score + best_sub = sub + + if best_sub and best_score >= SYNC_CONFIG['min_match_score']: + matches.append((best_sub, abs_start, best_score)) + return matches diff --git a/core/media.py b/core/media.py new file mode 100644 index 0000000..34c11a5 --- /dev/null +++ b/core/media.py @@ -0,0 +1,44 @@ +import json +import os +import subprocess +import tempfile + + +class MediaHandler: + @staticmethod + def get_media_duration(media_path: str) -> float: + cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", + "default=noprint_wrappers=1:nokey=1", media_path] + try: + return float(subprocess.run(cmd, capture_output=True, text=True).stdout.strip()) + except Exception: + return 3600.0 + + @staticmethod + def get_audio_stream_index(media_path: str, language: str) -> str: + lang_map = {'english': 'eng', 'french': 'fre', 'fra': 'fre', 'german': 'ger', 'spanish': 'spa', + 'italian': 'ita'} + target_iso = lang_map.get(language.lower(), 'eng') + + cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-select_streams", "a", media_path] + try: + data = json.loads(subprocess.run(cmd, capture_output=True, text=True).stdout) + for i, stream in enumerate(data.get('streams', [])): + if stream.get('tags', {}).get('language', 'und').lower() == target_iso: + return f"0:a:{i}" + return "0:a:0" + except Exception: + return "0:a:0" + + @staticmethod + def extract_audio_chunk(media_path: str, start_sec: int, duration_sec: int, stream_index: str) -> str: + fd, tmp_name = tempfile.mkstemp(suffix=".wav") + os.close(fd) + + cmd = [ + "ffmpeg", "-y", "-ss", str(start_sec), "-i", media_path, + "-map", stream_index, "-t", str(duration_sec), + "-ac", "1", "-ar", "16000", "-vn", "-loglevel", "error", tmp_name + ] + subprocess.run(cmd, check=True) + return tmp_name diff --git a/core/subtitles.py b/core/subtitles.py new file mode 100644 index 0000000..3c38571 --- /dev/null +++ b/core/subtitles.py @@ -0,0 +1,49 @@ +import os +import re +from typing import List + +from .types import SubtitleEntry + + +class SubtitleHandler: + @staticmethod + def parse_time(t): + h, m, s, ms = int(t[:2]), int(t[3:5]), int(t[6:8]), int(t[9:]) + return h * 3600000 + m * 60000 + s * 1000 + ms + + @staticmethod + def format_time(ms): + ms = max(0, ms) + h, r = divmod(ms, 3600000) + m, r = divmod(r, 60000) + s, ms = divmod(r, 1000) + return f"{h:02}:{m:02}:{s:02},{ms:03}" + + @staticmethod + def parse_srt(filepath: str) -> List[SubtitleEntry]: + if not os.path.exists(filepath): return [] + encodings = ['utf-8-sig', 'utf-8', 'latin-1'] + content = "" + for enc in encodings: + try: + with open(filepath, 'r', encoding=enc) as f: + content = f.read() + break + except UnicodeDecodeError: + continue + + entries = [] + pattern = re.compile(r'(\d+)\n(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\n((?:(?!\r?\n\r?\n).)*)', + re.DOTALL) + for match in pattern.finditer(content): + start = SubtitleHandler.parse_time(match.group(2)) + end = SubtitleHandler.parse_time(match.group(3)) + entries.append(SubtitleEntry(int(match.group(1)), start, end, match.group(4).strip())) + return entries + + @staticmethod + def write_srt(filepath: str, entries: List[SubtitleEntry]): + with open(filepath, 'w', encoding='utf-8') as f: + for entry in entries: + f.write( + f"{entry.index}\n{SubtitleHandler.format_time(entry.start_ms)} --> {SubtitleHandler.format_time(entry.end_ms)}\n{entry.raw_text}\n\n") diff --git a/core/types.py b/core/types.py new file mode 100644 index 0000000..34ea277 --- /dev/null +++ b/core/types.py @@ -0,0 +1,32 @@ +from dataclasses import dataclass + + +@dataclass +class SubtitleEntry: + index: int + start_ms: int + end_ms: int + raw_text: str + + +@dataclass +class WhisperSegment: + start_ms: int + end_ms: int + text: str + + +@dataclass +class AnalysisPoint: + timestamp_ms: int + offset_ms: int + match_count: int + + +@dataclass +class SubtitleInfo: + episode_path: str + episode_name: str + subtitle_path: str + episode_language: str + subtitles_language: str diff --git a/main.py b/main.py new file mode 100644 index 0000000..c38958d --- /dev/null +++ b/main.py @@ -0,0 +1,169 @@ +import os +import statistics +import sys + +from faster_whisper import WhisperModel + +from config import SYNC_CONFIG +from core.analysis import Analyzer +from core.matcher import TextMatcher +from core.media import MediaHandler +from core.subtitles import SubtitleHandler +from core.types import SubtitleInfo, WhisperSegment, AnalysisPoint + + +def parse_bazarr_args(args) -> SubtitleInfo: + arg_dict = {} + for arg in args[1:]: + if '=' in arg: + key, value = arg.split('=', 1) + arg_dict[key] = value + return SubtitleInfo( + episode_path=arg_dict.get('episode', ''), + episode_name=arg_dict.get('episode_name', 'Unknown'), + subtitle_path=arg_dict.get('subtitles', ''), + episode_language=arg_dict.get('episode_language', 'English'), + subtitles_language=arg_dict.get('subtitles_language', 'English') + ) + + +def main(): + info = parse_bazarr_args(sys.argv) + print(f"Target: {info.episode_name}") + + # 1. Init + audio_stream = MediaHandler.get_audio_stream_index(info.episode_path, info.episode_language) + media_duration = MediaHandler.get_media_duration(info.episode_path) + + print(f"Duration: {int(media_duration // 60)}m. Loading Whisper...") + model_name = "base.en" if 'english' in info.episode_language.lower() else "base" + whisper = WhisperModel(model_name, device="cpu", compute_type="int8", cpu_threads=4) + + subtitles = SubtitleHandler.parse_srt(info.subtitle_path) + if not subtitles: + print("Error: Subtitle file is empty.") + return + + # 2. Scanning Loop + usable_duration = media_duration - 60 + step = usable_duration / (SYNC_CONFIG['sample_count'] + 1) + sample_starts = [30 + (i * step) for i in range(SYNC_CONFIG['sample_count'])] + + raw_points = [] + print(f"\n--- Scanning {len(sample_starts)} Checkpoints ---") + + for start_sec in sample_starts: + print(f"Scanning @ {int(start_sec // 60)}m...", end='', flush=True) + audio_file = None + try: + audio_file = MediaHandler.extract_audio_chunk( + info.episode_path, int(start_sec), SYNC_CONFIG['scan_duration_sec'], audio_stream + ) + + segments, _ = whisper.transcribe(audio_file, vad_filter=True) + w_segments = [WhisperSegment(int(s.start * 1000), int(s.end * 1000), s.text) for s in list(segments)] + + matches = TextMatcher.find_matches(subtitles, w_segments, int(start_sec * 1000)) + if len(matches) >= SYNC_CONFIG['min_match_count']: + offsets = [w_time - sub.start_ms for sub, w_time, _ in matches] + median_offset = statistics.median(offsets) + avg_sub_time = statistics.mean([sub.start_ms for sub, _, _ in matches]) + raw_points.append(AnalysisPoint(avg_sub_time, median_offset, len(matches))) + print(f" Locked: {median_offset:+.0f}ms ({len(matches)} matches)") + else: + print(f" No Lock") + except Exception as e: + print(f" Error: {e}") + finally: + if audio_file and os.path.exists(audio_file): + os.unlink(audio_file) + + if not raw_points: + print("FAILED: No sync points found.") + return + + # 3. Decision + raw_points.sort(key=lambda x: x.timestamp_ms) + clean_points = Analyzer.filter_outliers(raw_points) + + mode = SYNC_CONFIG['correction_method'].upper() + if mode == "AUTO": + mode = Analyzer.decide_sync_strategy(raw_points) + elif mode == "FORCE_ELASTIC": + mode = "ELASTIC" + else: + mode = "CONSTANT" + + print(f"\n--- SYNC MODE: {mode} ---") + + final_slope = 1.0 + final_intercept = 0.0 + final_anchors = [] + + if mode == "CONSTANT": + final_intercept = statistics.median([p.offset_ms for p in clean_points]) + print(f"Applying Global Offset: {final_intercept:+.0f} ms") + + elif mode == "LINEAR": + final_slope, final_intercept, _ = Analyzer.calculate_weighted_regression(clean_points) + print(f"Applying Linear Correction: Slope={final_slope:.6f}, Base={final_intercept:.0f}ms") + + elif mode == "ELASTIC": + anchors = Analyzer.smooth_points(clean_points) + final_anchors = [AnalysisPoint(0, anchors[0].offset_ms, 0)] + anchors + \ + [AnalysisPoint(int(media_duration * 1000), anchors[-1].offset_ms, 0)] + print("Applying Non-Linear (Elastic) Map.") + + # 4. Apply + count = 0 + for sub in subtitles: + new_start, new_end = sub.start_ms, sub.end_ms + + if mode == "CONSTANT": + new_start += final_intercept + new_end += final_intercept + elif mode == "LINEAR": + new_start = (sub.start_ms * final_slope) + final_intercept + new_end = (sub.end_ms * final_slope) + final_intercept + elif mode == "ELASTIC": + off = Analyzer.get_interpolated_offset(sub.start_ms, final_anchors) + new_start += off + new_end += off + + sub.start_ms = max(0, int(new_start)) + sub.end_ms = max(0, int(new_end)) + count += 1 + + SubtitleHandler.write_srt(info.subtitle_path, subtitles) + print(f"Successfully synced {count} lines.") + + +if __name__ == '__main__': + # sys.argv = [ + # 'sync_script.py', + # 'episode=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E01/Superman & Lois - S03E01 - Closer Bluray-1080p.mkv', + # 'episode_name=Superman & Lois - S03E01', + # 'subtitles=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E01/Superman & Lois - S03E01 - Closer Bluray-1080p.en.hi.srt', + # 'episode_language=English', + # 'subtitles_language=English' + # ] + + # sys.argv = [ + # 'sync_script.py', + # 'episode=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E07/Superman & Lois - S03E07 - Forever And Always Bluray-1080p.mkv', + # 'episode_name=Superman & Lois - S03E07', + # 'subtitles=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E07/Superman & Lois - S03E07 - Forever And Always Bluray-1080p.en.srt', + # 'episode_language=English', + # 'subtitles_language=English' + # ] + + sys.argv = [ + 'sync_script.py', + 'episode=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E05/Superman & Lois - S03E05 - Head On Bluray-1080p.mkv', + 'episode_name=Superman & Lois - S03E05', + 'subtitles=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E05/Superman & Lois - S03E05 - Head On Bluray-1080p.en.srt', + 'episode_language=English', + 'subtitles_language=English' + ] + + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..591bf8d --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +git+https://github.com/absadiki/pywhispercpp \ No newline at end of file