Compare commits

..

2 Commits

6 changed files with 180 additions and 41 deletions

109
README.md Normal file
View File

@@ -0,0 +1,109 @@
# Whisper-Powered Subtitle Synchronization
**A smart subtitle synchronization tool powered by (OpenAI's) Whisper.**
This tool automatically detects and fixes desynchronized subtitles by listening to the audio track of your media. Unlike standard tools that only apply a fixed time shift, this project detects **Non-Linear Drift**, **Framerate Mismatches**, and **Variable Speed** issues, applying an "Elastic" correction map to perfectly align subtitles from start to finish.
Designed to work as a standalone CLI tool or a **Bazarr** post-processing script.
> [!INFO]
> Generative AI has been used during the development of this project.
---
## Installation
### 1. Prerequisites
* **Python 3.9+**
* **FFmpeg:** Must be installed and accessible in your system PATH.
* *Linux:* `sudo apt install ffmpeg`
* *Windows:* Download binaries and add to PATH.
### 2. Clone & Install
```bash
git clone <url of this repo>
cd <repo folder>
# (Optional) Create a virtual environment
python -m venv venv
source venv/bin/activate # or venv\Scripts\activate on Windows
# Install dependencies
pip install -r requirements.txt
```
---
## Configuration
All settings are located in `config.py`. You can tweak these to balance speed vs. accuracy, the most importants being:
```python
SYNC_CONFIG = {
"device": "cpu", # Use 'cuda' if you have an NVIDIA GPU
"compute_type": "int8", # Use 'float16' for GPU
"sample_count": 25, # How many points to check (higher = more accurate curve)
"scan_duration_sec": 60, # The length of each audio chunk to transcribe (higher = more data, slower)
"correction_method": "auto" # "auto", "constant", or "force_elastic"
}
```
---
## How It Works
1. **Extract:** The tool extracts small audio chunks (e.g., 60 seconds) at regular intervals (Checkpoints) throughout the media file.
2. **Transcribe:** It uses Whisper to transcribe the speech in those chunks.
3. **Match:** It fuzzy-matches the transcribed text against the subtitle file to find the *actual* timestamp vs the *subtitle* timestamp.
4. **Analyze:**
- If offsets are stable Apply **Global Offset**.
- If offsets drift linearly Apply **Linear Regression** (Slope correction).
- If offsets are chaotic Generate an **Elastic Map** (Piecewise Interpolation).
5. **Apply:** The subtitles are rewritten with the corrected timings.
---
## Usage
### Command Line (Manual)
You can run the script manually by mimicking the Bazarr argument format:
```bash
python main.py \
episode="/path/to/movie.mkv" \
episode_name="My Movie" \
subtitles="/path/to/subs.srt" \
episode_language="English" \
subtitles_language="English"
```
### Integration with Bazarr
> [!CAUTION]
> Untested yet
This tool is designed to be a "Custom Script" in Bazarr.
1. Go to **Bazarr > Settings > Subtitles > Post-Processing**.
2. Enable **"Execute a custom script"**.
3. **Command:**
```bash
python /path/to/script/main.py
```
4. **Arguments:**
```text
episode="{{episode}}" episode_name="{{episode_name}}" subtitles="{{subtitles}}" episode_language="{{episode_language}}" subtitles_language="{{subtitles_language}}"
```
*(Note: Bazarr passes these variables automatically).*

View File

@@ -23,4 +23,6 @@ SYNC_CONFIG = {
"jitter_tolerance_ms": 300, "jitter_tolerance_ms": 300,
"min_drift_slope": 0.00005, "min_drift_slope": 0.00005,
"linear_r2_threshold": 0.80, "linear_r2_threshold": 0.80,
}
"create_backup": True,
}

View File

@@ -1,7 +1,9 @@
import os import os
import re import re
import shutil
from typing import List from typing import List
from config import SYNC_CONFIG
from .types import SubtitleEntry from .types import SubtitleEntry
@@ -41,9 +43,28 @@ class SubtitleHandler:
entries.append(SubtitleEntry(int(match.group(1)), start, end, match.group(4).strip())) entries.append(SubtitleEntry(int(match.group(1)), start, end, match.group(4).strip()))
return entries return entries
@staticmethod
def create_backup(filepath: str):
"""Creates a .bak copy of the subtitles if one doesn't exist."""
backup_path = filepath + ".bak"
if not os.path.exists(backup_path):
try:
shutil.copy2(filepath, backup_path)
print(f"Backup created: {os.path.basename(backup_path)}")
except IOError as e:
print(f"Warning: Could not create backup: {e}")
@staticmethod @staticmethod
def write_srt(filepath: str, entries: List[SubtitleEntry]): def write_srt(filepath: str, entries: List[SubtitleEntry]):
with open(filepath, 'w', encoding='utf-8') as f: # 1. Ensure backup exists before overwriting
for entry in entries: if SYNC_CONFIG['create_backup']:
f.write( SubtitleHandler.create_backup(filepath)
f"{entry.index}\n{SubtitleHandler.format_time(entry.start_ms)} --> {SubtitleHandler.format_time(entry.end_ms)}\n{entry.raw_text}\n\n")
# 2. Overwrite
try:
with open(filepath, 'w', encoding='utf-8') as f:
for entry in entries:
f.write(
f"{entry.index}\n{SubtitleHandler.format_time(entry.start_ms)} --> {SubtitleHandler.format_time(entry.end_ms)}\n{entry.raw_text}\n\n")
except IOError as e:
print(f"Error writing subtitle file: {e}")

18
core/utils.py Normal file
View File

@@ -0,0 +1,18 @@
from .types import SubtitleInfo
def parse_bazarr_args(args: list) -> SubtitleInfo:
"""Parses key=value arguments passed by Bazarr."""
arg_dict = {}
for arg in args[1:]:
if '=' in arg:
key, value = arg.split('=', 1)
arg_dict[key] = value.strip('"').strip("'") # Clean quotes if present
return SubtitleInfo(
episode_path=arg_dict.get('episode', ''),
episode_name=arg_dict.get('episode_name', 'Unknown'),
subtitle_path=arg_dict.get('subtitles', ''),
episode_language=arg_dict.get('episode_language', 'English'),
subtitles_language=arg_dict.get('subtitles_language', 'English')
)

59
main.py
View File

@@ -9,40 +9,28 @@ from core.analysis import Analyzer
from core.matcher import TextMatcher from core.matcher import TextMatcher
from core.media import MediaHandler from core.media import MediaHandler
from core.subtitles import SubtitleHandler from core.subtitles import SubtitleHandler
from core.types import SubtitleInfo, WhisperSegment, AnalysisPoint from core.types import WhisperSegment, AnalysisPoint
from core.utils import parse_bazarr_args
def parse_bazarr_args(args) -> SubtitleInfo:
arg_dict = {}
for arg in args[1:]:
if '=' in arg:
key, value = arg.split('=', 1)
arg_dict[key] = value
return SubtitleInfo(
episode_path=arg_dict.get('episode', ''),
episode_name=arg_dict.get('episode_name', 'Unknown'),
subtitle_path=arg_dict.get('subtitles', ''),
episode_language=arg_dict.get('episode_language', 'English'),
subtitles_language=arg_dict.get('subtitles_language', 'English')
)
def main(): def main():
# 1. Parse Arguments
info = parse_bazarr_args(sys.argv) info = parse_bazarr_args(sys.argv)
if not info.subtitle_path or not os.path.exists(info.subtitle_path):
print("Error: Invalid or missing subtitle path.")
return
print(f"Target: {info.episode_name}") print(f"Target: {info.episode_name}")
# 1. Init # 2. Initialize Resources
audio_stream = MediaHandler.get_audio_stream_index(info.episode_path, info.episode_language) audio_stream = MediaHandler.get_audio_stream_index(info.episode_path, info.episode_language)
media_duration = MediaHandler.get_media_duration(info.episode_path) media_duration = MediaHandler.get_media_duration(info.episode_path)
whisper_lang = MediaHandler.get_language_code(info.episode_language)
# Get the 2-letter code (e.g., "en")
whisper_lang_code = MediaHandler.get_language_code(info.episode_language)
print(f"Duration: {int(media_duration // 60)}m. Loading Whisper ({SYNC_CONFIG['device']})...") print(f"Duration: {int(media_duration // 60)}m. Loading Whisper ({SYNC_CONFIG['device']})...")
# Load model based on Config # Load Model
model_name = "base.en" if whisper_lang_code == 'en' else "base" model_name = "base.en" if whisper_lang == 'en' else "base"
try: try:
whisper = WhisperModel( whisper = WhisperModel(
model_name, model_name,
@@ -51,15 +39,12 @@ def main():
cpu_threads=4 cpu_threads=4
) )
except Exception as e: except Exception as e:
print(f"Error loading model: {e}") print(f"CRITICAL: Failed to load Whisper model: {e}")
return return
subtitles = SubtitleHandler.parse_srt(info.subtitle_path) subtitles = SubtitleHandler.parse_srt(info.subtitle_path)
if not subtitles:
print("Error: Subtitle file is empty.")
return
# 2. Scanning Loop # 3. Scanning Loop
usable_duration = media_duration - 60 usable_duration = media_duration - 60
step = usable_duration / (SYNC_CONFIG['sample_count'] + 1) step = usable_duration / (SYNC_CONFIG['sample_count'] + 1)
sample_starts = [30 + (i * step) for i in range(SYNC_CONFIG['sample_count'])] sample_starts = [30 + (i * step) for i in range(SYNC_CONFIG['sample_count'])]
@@ -75,14 +60,14 @@ def main():
info.episode_path, int(start_sec), SYNC_CONFIG['scan_duration_sec'], audio_stream info.episode_path, int(start_sec), SYNC_CONFIG['scan_duration_sec'], audio_stream
) )
# Optimized Transcribe
segments, _ = whisper.transcribe( segments, _ = whisper.transcribe(
audio_file, audio_file,
vad_filter=SYNC_CONFIG['vad_filter'], vad_filter=SYNC_CONFIG['vad_filter'],
vad_parameters=dict(min_silence_duration_ms=SYNC_CONFIG['vad_min_silence']), vad_parameters=dict(min_silence_duration_ms=SYNC_CONFIG['vad_min_silence']),
language=whisper_lang_code, language=whisper_lang,
beam_size=SYNC_CONFIG['beam_size'], beam_size=SYNC_CONFIG['beam_size'],
condition_on_previous_text=False, condition_on_previous_text=False
word_timestamps=False
) )
w_segments = [WhisperSegment(int(s.start * 1000), int(s.end * 1000), s.text) for s in list(segments)] w_segments = [WhisperSegment(int(s.start * 1000), int(s.end * 1000), s.text) for s in list(segments)]
@@ -103,10 +88,10 @@ def main():
os.unlink(audio_file) os.unlink(audio_file)
if not raw_points: if not raw_points:
print("FAILED: No sync points found.") print("FAILED: No sync points found. Exiting.")
return return
# 3. Decision # 4. Analysis & Decision
raw_points.sort(key=lambda x: x.timestamp_ms) raw_points.sort(key=lambda x: x.timestamp_ms)
clean_points = Analyzer.filter_outliers(raw_points) clean_points = Analyzer.filter_outliers(raw_points)
@@ -128,17 +113,22 @@ def main():
final_intercept = statistics.median([p.offset_ms for p in clean_points]) final_intercept = statistics.median([p.offset_ms for p in clean_points])
print(f"Applying Global Offset: {final_intercept:+.0f} ms") print(f"Applying Global Offset: {final_intercept:+.0f} ms")
if abs(final_intercept) < 50:
print("Offset is negligible. No changes needed.")
return
elif mode == "LINEAR": elif mode == "LINEAR":
final_slope, final_intercept, _ = Analyzer.calculate_weighted_regression(clean_points) final_slope, final_intercept, _ = Analyzer.calculate_weighted_regression(clean_points)
print(f"Applying Linear Correction: Slope={final_slope:.6f}, Base={final_intercept:.0f}ms") print(f"Applying Linear Correction: Slope={final_slope:.6f}, Base={final_intercept:.0f}ms")
elif mode == "ELASTIC": elif mode == "ELASTIC":
anchors = Analyzer.smooth_points(clean_points) anchors = Analyzer.smooth_points(clean_points)
# Extend anchors to cover 0 to End
final_anchors = [AnalysisPoint(0, anchors[0].offset_ms, 0)] + anchors + \ final_anchors = [AnalysisPoint(0, anchors[0].offset_ms, 0)] + anchors + \
[AnalysisPoint(int(media_duration * 1000), anchors[-1].offset_ms, 0)] [AnalysisPoint(int(media_duration * 1000), anchors[-1].offset_ms, 0)]
print("Applying Non-Linear (Elastic) Map.") print("Applying Non-Linear (Elastic) Map.")
# 4. Apply # 5. Application
count = 0 count = 0
for sub in subtitles: for sub in subtitles:
new_start, new_end = sub.start_ms, sub.end_ms new_start, new_end = sub.start_ms, sub.end_ms
@@ -190,7 +180,6 @@ if __name__ == '__main__':
'subtitles_language=English' 'subtitles_language=English'
] ]
# sys.argv = [ # sys.argv = [
# 'sync_script.py', # 'sync_script.py',
# 'episode=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E05/Superman & Lois - S03E05 - Head On Bluray-1080p.mkv', # 'episode=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E05/Superman & Lois - S03E05 - Head On Bluray-1080p.mkv',

View File

@@ -1 +1 @@
git+https://github.com/absadiki/pywhispercpp faster-whisper