Compare commits
2 Commits
e1a7b69f76
...
c7c4f9a0aa
| Author | SHA1 | Date | |
|---|---|---|---|
|
c7c4f9a0aa
|
|||
|
2a7f1a526d
|
109
README.md
Normal file
109
README.md
Normal file
@@ -0,0 +1,109 @@
|
|||||||
|
# Whisper-Powered Subtitle Synchronization
|
||||||
|
|
||||||
|
**A smart subtitle synchronization tool powered by (OpenAI's) Whisper.**
|
||||||
|
|
||||||
|
This tool automatically detects and fixes desynchronized subtitles by listening to the audio track of your media. Unlike standard tools that only apply a fixed time shift, this project detects **Non-Linear Drift**, **Framerate Mismatches**, and **Variable Speed** issues, applying an "Elastic" correction map to perfectly align subtitles from start to finish.
|
||||||
|
|
||||||
|
Designed to work as a standalone CLI tool or a **Bazarr** post-processing script.
|
||||||
|
|
||||||
|
> [!INFO]
|
||||||
|
> Generative AI has been used during the development of this project.
|
||||||
|
---
|
||||||
|
|
||||||
|
## Installation
|
||||||
|
### 1. Prerequisites
|
||||||
|
|
||||||
|
* **Python 3.9+**
|
||||||
|
* **FFmpeg:** Must be installed and accessible in your system PATH.
|
||||||
|
* *Linux:* `sudo apt install ffmpeg`
|
||||||
|
* *Windows:* Download binaries and add to PATH.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
### 2. Clone & Install
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git clone <url of this repo>
|
||||||
|
cd <repo folder>
|
||||||
|
|
||||||
|
# (Optional) Create a virtual environment
|
||||||
|
python -m venv venv
|
||||||
|
source venv/bin/activate # or venv\Scripts\activate on Windows
|
||||||
|
|
||||||
|
# Install dependencies
|
||||||
|
pip install -r requirements.txt
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
All settings are located in `config.py`. You can tweak these to balance speed vs. accuracy, the most importants being:
|
||||||
|
|
||||||
|
```python
|
||||||
|
SYNC_CONFIG = {
|
||||||
|
"device": "cpu", # Use 'cuda' if you have an NVIDIA GPU
|
||||||
|
"compute_type": "int8", # Use 'float16' for GPU
|
||||||
|
"sample_count": 25, # How many points to check (higher = more accurate curve)
|
||||||
|
"scan_duration_sec": 60, # The length of each audio chunk to transcribe (higher = more data, slower)
|
||||||
|
"correction_method": "auto" # "auto", "constant", or "force_elastic"
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## How It Works
|
||||||
|
|
||||||
|
1. **Extract:** The tool extracts small audio chunks (e.g., 60 seconds) at regular intervals (Checkpoints) throughout the media file.
|
||||||
|
2. **Transcribe:** It uses Whisper to transcribe the speech in those chunks.
|
||||||
|
3. **Match:** It fuzzy-matches the transcribed text against the subtitle file to find the *actual* timestamp vs the *subtitle* timestamp.
|
||||||
|
4. **Analyze:**
|
||||||
|
- If offsets are stable Apply **Global Offset**.
|
||||||
|
- If offsets drift linearly Apply **Linear Regression** (Slope correction).
|
||||||
|
- If offsets are chaotic Generate an **Elastic Map** (Piecewise Interpolation).
|
||||||
|
|
||||||
|
|
||||||
|
5. **Apply:** The subtitles are rewritten with the corrected timings.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
### Command Line (Manual)
|
||||||
|
|
||||||
|
You can run the script manually by mimicking the Bazarr argument format:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python main.py \
|
||||||
|
episode="/path/to/movie.mkv" \
|
||||||
|
episode_name="My Movie" \
|
||||||
|
subtitles="/path/to/subs.srt" \
|
||||||
|
episode_language="English" \
|
||||||
|
subtitles_language="English"
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
### Integration with Bazarr
|
||||||
|
|
||||||
|
> [!CAUTION]
|
||||||
|
> Untested yet
|
||||||
|
|
||||||
|
This tool is designed to be a "Custom Script" in Bazarr.
|
||||||
|
|
||||||
|
1. Go to **Bazarr > Settings > Subtitles > Post-Processing**.
|
||||||
|
2. Enable **"Execute a custom script"**.
|
||||||
|
3. **Command:**
|
||||||
|
```bash
|
||||||
|
python /path/to/script/main.py
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
4. **Arguments:**
|
||||||
|
```text
|
||||||
|
episode="{{episode}}" episode_name="{{episode_name}}" subtitles="{{subtitles}}" episode_language="{{episode_language}}" subtitles_language="{{subtitles_language}}"
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
*(Note: Bazarr passes these variables automatically).*
|
||||||
@@ -23,4 +23,6 @@ SYNC_CONFIG = {
|
|||||||
"jitter_tolerance_ms": 300,
|
"jitter_tolerance_ms": 300,
|
||||||
"min_drift_slope": 0.00005,
|
"min_drift_slope": 0.00005,
|
||||||
"linear_r2_threshold": 0.80,
|
"linear_r2_threshold": 0.80,
|
||||||
}
|
|
||||||
|
"create_backup": True,
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import shutil
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
|
from config import SYNC_CONFIG
|
||||||
from .types import SubtitleEntry
|
from .types import SubtitleEntry
|
||||||
|
|
||||||
|
|
||||||
@@ -41,9 +43,28 @@ class SubtitleHandler:
|
|||||||
entries.append(SubtitleEntry(int(match.group(1)), start, end, match.group(4).strip()))
|
entries.append(SubtitleEntry(int(match.group(1)), start, end, match.group(4).strip()))
|
||||||
return entries
|
return entries
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create_backup(filepath: str):
|
||||||
|
"""Creates a .bak copy of the subtitles if one doesn't exist."""
|
||||||
|
backup_path = filepath + ".bak"
|
||||||
|
if not os.path.exists(backup_path):
|
||||||
|
try:
|
||||||
|
shutil.copy2(filepath, backup_path)
|
||||||
|
print(f"Backup created: {os.path.basename(backup_path)}")
|
||||||
|
except IOError as e:
|
||||||
|
print(f"Warning: Could not create backup: {e}")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def write_srt(filepath: str, entries: List[SubtitleEntry]):
|
def write_srt(filepath: str, entries: List[SubtitleEntry]):
|
||||||
with open(filepath, 'w', encoding='utf-8') as f:
|
# 1. Ensure backup exists before overwriting
|
||||||
for entry in entries:
|
if SYNC_CONFIG['create_backup']:
|
||||||
f.write(
|
SubtitleHandler.create_backup(filepath)
|
||||||
f"{entry.index}\n{SubtitleHandler.format_time(entry.start_ms)} --> {SubtitleHandler.format_time(entry.end_ms)}\n{entry.raw_text}\n\n")
|
|
||||||
|
# 2. Overwrite
|
||||||
|
try:
|
||||||
|
with open(filepath, 'w', encoding='utf-8') as f:
|
||||||
|
for entry in entries:
|
||||||
|
f.write(
|
||||||
|
f"{entry.index}\n{SubtitleHandler.format_time(entry.start_ms)} --> {SubtitleHandler.format_time(entry.end_ms)}\n{entry.raw_text}\n\n")
|
||||||
|
except IOError as e:
|
||||||
|
print(f"Error writing subtitle file: {e}")
|
||||||
|
|||||||
18
core/utils.py
Normal file
18
core/utils.py
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
from .types import SubtitleInfo
|
||||||
|
|
||||||
|
|
||||||
|
def parse_bazarr_args(args: list) -> SubtitleInfo:
|
||||||
|
"""Parses key=value arguments passed by Bazarr."""
|
||||||
|
arg_dict = {}
|
||||||
|
for arg in args[1:]:
|
||||||
|
if '=' in arg:
|
||||||
|
key, value = arg.split('=', 1)
|
||||||
|
arg_dict[key] = value.strip('"').strip("'") # Clean quotes if present
|
||||||
|
|
||||||
|
return SubtitleInfo(
|
||||||
|
episode_path=arg_dict.get('episode', ''),
|
||||||
|
episode_name=arg_dict.get('episode_name', 'Unknown'),
|
||||||
|
subtitle_path=arg_dict.get('subtitles', ''),
|
||||||
|
episode_language=arg_dict.get('episode_language', 'English'),
|
||||||
|
subtitles_language=arg_dict.get('subtitles_language', 'English')
|
||||||
|
)
|
||||||
59
main.py
59
main.py
@@ -9,40 +9,28 @@ from core.analysis import Analyzer
|
|||||||
from core.matcher import TextMatcher
|
from core.matcher import TextMatcher
|
||||||
from core.media import MediaHandler
|
from core.media import MediaHandler
|
||||||
from core.subtitles import SubtitleHandler
|
from core.subtitles import SubtitleHandler
|
||||||
from core.types import SubtitleInfo, WhisperSegment, AnalysisPoint
|
from core.types import WhisperSegment, AnalysisPoint
|
||||||
|
from core.utils import parse_bazarr_args
|
||||||
|
|
||||||
def parse_bazarr_args(args) -> SubtitleInfo:
|
|
||||||
arg_dict = {}
|
|
||||||
for arg in args[1:]:
|
|
||||||
if '=' in arg:
|
|
||||||
key, value = arg.split('=', 1)
|
|
||||||
arg_dict[key] = value
|
|
||||||
return SubtitleInfo(
|
|
||||||
episode_path=arg_dict.get('episode', ''),
|
|
||||||
episode_name=arg_dict.get('episode_name', 'Unknown'),
|
|
||||||
subtitle_path=arg_dict.get('subtitles', ''),
|
|
||||||
episode_language=arg_dict.get('episode_language', 'English'),
|
|
||||||
subtitles_language=arg_dict.get('subtitles_language', 'English')
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
# 1. Parse Arguments
|
||||||
info = parse_bazarr_args(sys.argv)
|
info = parse_bazarr_args(sys.argv)
|
||||||
|
if not info.subtitle_path or not os.path.exists(info.subtitle_path):
|
||||||
|
print("Error: Invalid or missing subtitle path.")
|
||||||
|
return
|
||||||
|
|
||||||
print(f"Target: {info.episode_name}")
|
print(f"Target: {info.episode_name}")
|
||||||
|
|
||||||
# 1. Init
|
# 2. Initialize Resources
|
||||||
audio_stream = MediaHandler.get_audio_stream_index(info.episode_path, info.episode_language)
|
audio_stream = MediaHandler.get_audio_stream_index(info.episode_path, info.episode_language)
|
||||||
media_duration = MediaHandler.get_media_duration(info.episode_path)
|
media_duration = MediaHandler.get_media_duration(info.episode_path)
|
||||||
|
whisper_lang = MediaHandler.get_language_code(info.episode_language)
|
||||||
# Get the 2-letter code (e.g., "en")
|
|
||||||
whisper_lang_code = MediaHandler.get_language_code(info.episode_language)
|
|
||||||
|
|
||||||
print(f"Duration: {int(media_duration // 60)}m. Loading Whisper ({SYNC_CONFIG['device']})...")
|
print(f"Duration: {int(media_duration // 60)}m. Loading Whisper ({SYNC_CONFIG['device']})...")
|
||||||
|
|
||||||
# Load model based on Config
|
# Load Model
|
||||||
model_name = "base.en" if whisper_lang_code == 'en' else "base"
|
model_name = "base.en" if whisper_lang == 'en' else "base"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
whisper = WhisperModel(
|
whisper = WhisperModel(
|
||||||
model_name,
|
model_name,
|
||||||
@@ -51,15 +39,12 @@ def main():
|
|||||||
cpu_threads=4
|
cpu_threads=4
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error loading model: {e}")
|
print(f"CRITICAL: Failed to load Whisper model: {e}")
|
||||||
return
|
return
|
||||||
|
|
||||||
subtitles = SubtitleHandler.parse_srt(info.subtitle_path)
|
subtitles = SubtitleHandler.parse_srt(info.subtitle_path)
|
||||||
if not subtitles:
|
|
||||||
print("Error: Subtitle file is empty.")
|
|
||||||
return
|
|
||||||
|
|
||||||
# 2. Scanning Loop
|
# 3. Scanning Loop
|
||||||
usable_duration = media_duration - 60
|
usable_duration = media_duration - 60
|
||||||
step = usable_duration / (SYNC_CONFIG['sample_count'] + 1)
|
step = usable_duration / (SYNC_CONFIG['sample_count'] + 1)
|
||||||
sample_starts = [30 + (i * step) for i in range(SYNC_CONFIG['sample_count'])]
|
sample_starts = [30 + (i * step) for i in range(SYNC_CONFIG['sample_count'])]
|
||||||
@@ -75,14 +60,14 @@ def main():
|
|||||||
info.episode_path, int(start_sec), SYNC_CONFIG['scan_duration_sec'], audio_stream
|
info.episode_path, int(start_sec), SYNC_CONFIG['scan_duration_sec'], audio_stream
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Optimized Transcribe
|
||||||
segments, _ = whisper.transcribe(
|
segments, _ = whisper.transcribe(
|
||||||
audio_file,
|
audio_file,
|
||||||
vad_filter=SYNC_CONFIG['vad_filter'],
|
vad_filter=SYNC_CONFIG['vad_filter'],
|
||||||
vad_parameters=dict(min_silence_duration_ms=SYNC_CONFIG['vad_min_silence']),
|
vad_parameters=dict(min_silence_duration_ms=SYNC_CONFIG['vad_min_silence']),
|
||||||
language=whisper_lang_code,
|
language=whisper_lang,
|
||||||
beam_size=SYNC_CONFIG['beam_size'],
|
beam_size=SYNC_CONFIG['beam_size'],
|
||||||
condition_on_previous_text=False,
|
condition_on_previous_text=False
|
||||||
word_timestamps=False
|
|
||||||
)
|
)
|
||||||
|
|
||||||
w_segments = [WhisperSegment(int(s.start * 1000), int(s.end * 1000), s.text) for s in list(segments)]
|
w_segments = [WhisperSegment(int(s.start * 1000), int(s.end * 1000), s.text) for s in list(segments)]
|
||||||
@@ -103,10 +88,10 @@ def main():
|
|||||||
os.unlink(audio_file)
|
os.unlink(audio_file)
|
||||||
|
|
||||||
if not raw_points:
|
if not raw_points:
|
||||||
print("FAILED: No sync points found.")
|
print("FAILED: No sync points found. Exiting.")
|
||||||
return
|
return
|
||||||
|
|
||||||
# 3. Decision
|
# 4. Analysis & Decision
|
||||||
raw_points.sort(key=lambda x: x.timestamp_ms)
|
raw_points.sort(key=lambda x: x.timestamp_ms)
|
||||||
clean_points = Analyzer.filter_outliers(raw_points)
|
clean_points = Analyzer.filter_outliers(raw_points)
|
||||||
|
|
||||||
@@ -128,17 +113,22 @@ def main():
|
|||||||
final_intercept = statistics.median([p.offset_ms for p in clean_points])
|
final_intercept = statistics.median([p.offset_ms for p in clean_points])
|
||||||
print(f"Applying Global Offset: {final_intercept:+.0f} ms")
|
print(f"Applying Global Offset: {final_intercept:+.0f} ms")
|
||||||
|
|
||||||
|
if abs(final_intercept) < 50:
|
||||||
|
print("Offset is negligible. No changes needed.")
|
||||||
|
return
|
||||||
|
|
||||||
elif mode == "LINEAR":
|
elif mode == "LINEAR":
|
||||||
final_slope, final_intercept, _ = Analyzer.calculate_weighted_regression(clean_points)
|
final_slope, final_intercept, _ = Analyzer.calculate_weighted_regression(clean_points)
|
||||||
print(f"Applying Linear Correction: Slope={final_slope:.6f}, Base={final_intercept:.0f}ms")
|
print(f"Applying Linear Correction: Slope={final_slope:.6f}, Base={final_intercept:.0f}ms")
|
||||||
|
|
||||||
elif mode == "ELASTIC":
|
elif mode == "ELASTIC":
|
||||||
anchors = Analyzer.smooth_points(clean_points)
|
anchors = Analyzer.smooth_points(clean_points)
|
||||||
|
# Extend anchors to cover 0 to End
|
||||||
final_anchors = [AnalysisPoint(0, anchors[0].offset_ms, 0)] + anchors + \
|
final_anchors = [AnalysisPoint(0, anchors[0].offset_ms, 0)] + anchors + \
|
||||||
[AnalysisPoint(int(media_duration * 1000), anchors[-1].offset_ms, 0)]
|
[AnalysisPoint(int(media_duration * 1000), anchors[-1].offset_ms, 0)]
|
||||||
print("Applying Non-Linear (Elastic) Map.")
|
print("Applying Non-Linear (Elastic) Map.")
|
||||||
|
|
||||||
# 4. Apply
|
# 5. Application
|
||||||
count = 0
|
count = 0
|
||||||
for sub in subtitles:
|
for sub in subtitles:
|
||||||
new_start, new_end = sub.start_ms, sub.end_ms
|
new_start, new_end = sub.start_ms, sub.end_ms
|
||||||
@@ -190,7 +180,6 @@ if __name__ == '__main__':
|
|||||||
'subtitles_language=English'
|
'subtitles_language=English'
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
# sys.argv = [
|
# sys.argv = [
|
||||||
# 'sync_script.py',
|
# 'sync_script.py',
|
||||||
# 'episode=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E05/Superman & Lois - S03E05 - Head On Bluray-1080p.mkv',
|
# 'episode=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E05/Superman & Lois - S03E05 - Head On Bluray-1080p.mkv',
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
git+https://github.com/absadiki/pywhispercpp
|
faster-whisper
|
||||||
Reference in New Issue
Block a user