feat: add backup option and move some logic from main to utils

This commit is contained in:
2026-01-30 20:24:33 +01:00
parent e1a7b69f76
commit 2a7f1a526d
5 changed files with 71 additions and 41 deletions

View File

@@ -23,4 +23,6 @@ SYNC_CONFIG = {
"jitter_tolerance_ms": 300, "jitter_tolerance_ms": 300,
"min_drift_slope": 0.00005, "min_drift_slope": 0.00005,
"linear_r2_threshold": 0.80, "linear_r2_threshold": 0.80,
}
"create_backup": True,
}

View File

@@ -1,7 +1,9 @@
import os import os
import re import re
import shutil
from typing import List from typing import List
from config import SYNC_CONFIG
from .types import SubtitleEntry from .types import SubtitleEntry
@@ -41,9 +43,28 @@ class SubtitleHandler:
entries.append(SubtitleEntry(int(match.group(1)), start, end, match.group(4).strip())) entries.append(SubtitleEntry(int(match.group(1)), start, end, match.group(4).strip()))
return entries return entries
@staticmethod
def create_backup(filepath: str):
"""Creates a .bak copy of the subtitles if one doesn't exist."""
backup_path = filepath + ".bak"
if not os.path.exists(backup_path):
try:
shutil.copy2(filepath, backup_path)
print(f"Backup created: {os.path.basename(backup_path)}")
except IOError as e:
print(f"Warning: Could not create backup: {e}")
@staticmethod @staticmethod
def write_srt(filepath: str, entries: List[SubtitleEntry]): def write_srt(filepath: str, entries: List[SubtitleEntry]):
with open(filepath, 'w', encoding='utf-8') as f: # 1. Ensure backup exists before overwriting
for entry in entries: if SYNC_CONFIG['create_backup']:
f.write( SubtitleHandler.create_backup(filepath)
f"{entry.index}\n{SubtitleHandler.format_time(entry.start_ms)} --> {SubtitleHandler.format_time(entry.end_ms)}\n{entry.raw_text}\n\n")
# 2. Overwrite
try:
with open(filepath, 'w', encoding='utf-8') as f:
for entry in entries:
f.write(
f"{entry.index}\n{SubtitleHandler.format_time(entry.start_ms)} --> {SubtitleHandler.format_time(entry.end_ms)}\n{entry.raw_text}\n\n")
except IOError as e:
print(f"Error writing subtitle file: {e}")

18
core/utils.py Normal file
View File

@@ -0,0 +1,18 @@
from .types import SubtitleInfo
def parse_bazarr_args(args: list) -> SubtitleInfo:
"""Parses key=value arguments passed by Bazarr."""
arg_dict = {}
for arg in args[1:]:
if '=' in arg:
key, value = arg.split('=', 1)
arg_dict[key] = value.strip('"').strip("'") # Clean quotes if present
return SubtitleInfo(
episode_path=arg_dict.get('episode', ''),
episode_name=arg_dict.get('episode_name', 'Unknown'),
subtitle_path=arg_dict.get('subtitles', ''),
episode_language=arg_dict.get('episode_language', 'English'),
subtitles_language=arg_dict.get('subtitles_language', 'English')
)

59
main.py
View File

@@ -9,40 +9,28 @@ from core.analysis import Analyzer
from core.matcher import TextMatcher from core.matcher import TextMatcher
from core.media import MediaHandler from core.media import MediaHandler
from core.subtitles import SubtitleHandler from core.subtitles import SubtitleHandler
from core.types import SubtitleInfo, WhisperSegment, AnalysisPoint from core.types import WhisperSegment, AnalysisPoint
from core.utils import parse_bazarr_args
def parse_bazarr_args(args) -> SubtitleInfo:
arg_dict = {}
for arg in args[1:]:
if '=' in arg:
key, value = arg.split('=', 1)
arg_dict[key] = value
return SubtitleInfo(
episode_path=arg_dict.get('episode', ''),
episode_name=arg_dict.get('episode_name', 'Unknown'),
subtitle_path=arg_dict.get('subtitles', ''),
episode_language=arg_dict.get('episode_language', 'English'),
subtitles_language=arg_dict.get('subtitles_language', 'English')
)
def main(): def main():
# 1. Parse Arguments
info = parse_bazarr_args(sys.argv) info = parse_bazarr_args(sys.argv)
if not info.subtitle_path or not os.path.exists(info.subtitle_path):
print("Error: Invalid or missing subtitle path.")
return
print(f"Target: {info.episode_name}") print(f"Target: {info.episode_name}")
# 1. Init # 2. Initialize Resources
audio_stream = MediaHandler.get_audio_stream_index(info.episode_path, info.episode_language) audio_stream = MediaHandler.get_audio_stream_index(info.episode_path, info.episode_language)
media_duration = MediaHandler.get_media_duration(info.episode_path) media_duration = MediaHandler.get_media_duration(info.episode_path)
whisper_lang = MediaHandler.get_language_code(info.episode_language)
# Get the 2-letter code (e.g., "en")
whisper_lang_code = MediaHandler.get_language_code(info.episode_language)
print(f"Duration: {int(media_duration // 60)}m. Loading Whisper ({SYNC_CONFIG['device']})...") print(f"Duration: {int(media_duration // 60)}m. Loading Whisper ({SYNC_CONFIG['device']})...")
# Load model based on Config # Load Model
model_name = "base.en" if whisper_lang_code == 'en' else "base" model_name = "base.en" if whisper_lang == 'en' else "base"
try: try:
whisper = WhisperModel( whisper = WhisperModel(
model_name, model_name,
@@ -51,15 +39,12 @@ def main():
cpu_threads=4 cpu_threads=4
) )
except Exception as e: except Exception as e:
print(f"Error loading model: {e}") print(f"CRITICAL: Failed to load Whisper model: {e}")
return return
subtitles = SubtitleHandler.parse_srt(info.subtitle_path) subtitles = SubtitleHandler.parse_srt(info.subtitle_path)
if not subtitles:
print("Error: Subtitle file is empty.")
return
# 2. Scanning Loop # 3. Scanning Loop
usable_duration = media_duration - 60 usable_duration = media_duration - 60
step = usable_duration / (SYNC_CONFIG['sample_count'] + 1) step = usable_duration / (SYNC_CONFIG['sample_count'] + 1)
sample_starts = [30 + (i * step) for i in range(SYNC_CONFIG['sample_count'])] sample_starts = [30 + (i * step) for i in range(SYNC_CONFIG['sample_count'])]
@@ -75,14 +60,14 @@ def main():
info.episode_path, int(start_sec), SYNC_CONFIG['scan_duration_sec'], audio_stream info.episode_path, int(start_sec), SYNC_CONFIG['scan_duration_sec'], audio_stream
) )
# Optimized Transcribe
segments, _ = whisper.transcribe( segments, _ = whisper.transcribe(
audio_file, audio_file,
vad_filter=SYNC_CONFIG['vad_filter'], vad_filter=SYNC_CONFIG['vad_filter'],
vad_parameters=dict(min_silence_duration_ms=SYNC_CONFIG['vad_min_silence']), vad_parameters=dict(min_silence_duration_ms=SYNC_CONFIG['vad_min_silence']),
language=whisper_lang_code, language=whisper_lang,
beam_size=SYNC_CONFIG['beam_size'], beam_size=SYNC_CONFIG['beam_size'],
condition_on_previous_text=False, condition_on_previous_text=False
word_timestamps=False
) )
w_segments = [WhisperSegment(int(s.start * 1000), int(s.end * 1000), s.text) for s in list(segments)] w_segments = [WhisperSegment(int(s.start * 1000), int(s.end * 1000), s.text) for s in list(segments)]
@@ -103,10 +88,10 @@ def main():
os.unlink(audio_file) os.unlink(audio_file)
if not raw_points: if not raw_points:
print("FAILED: No sync points found.") print("FAILED: No sync points found. Exiting.")
return return
# 3. Decision # 4. Analysis & Decision
raw_points.sort(key=lambda x: x.timestamp_ms) raw_points.sort(key=lambda x: x.timestamp_ms)
clean_points = Analyzer.filter_outliers(raw_points) clean_points = Analyzer.filter_outliers(raw_points)
@@ -128,17 +113,22 @@ def main():
final_intercept = statistics.median([p.offset_ms for p in clean_points]) final_intercept = statistics.median([p.offset_ms for p in clean_points])
print(f"Applying Global Offset: {final_intercept:+.0f} ms") print(f"Applying Global Offset: {final_intercept:+.0f} ms")
if abs(final_intercept) < 50:
print("Offset is negligible. No changes needed.")
return
elif mode == "LINEAR": elif mode == "LINEAR":
final_slope, final_intercept, _ = Analyzer.calculate_weighted_regression(clean_points) final_slope, final_intercept, _ = Analyzer.calculate_weighted_regression(clean_points)
print(f"Applying Linear Correction: Slope={final_slope:.6f}, Base={final_intercept:.0f}ms") print(f"Applying Linear Correction: Slope={final_slope:.6f}, Base={final_intercept:.0f}ms")
elif mode == "ELASTIC": elif mode == "ELASTIC":
anchors = Analyzer.smooth_points(clean_points) anchors = Analyzer.smooth_points(clean_points)
# Extend anchors to cover 0 to End
final_anchors = [AnalysisPoint(0, anchors[0].offset_ms, 0)] + anchors + \ final_anchors = [AnalysisPoint(0, anchors[0].offset_ms, 0)] + anchors + \
[AnalysisPoint(int(media_duration * 1000), anchors[-1].offset_ms, 0)] [AnalysisPoint(int(media_duration * 1000), anchors[-1].offset_ms, 0)]
print("Applying Non-Linear (Elastic) Map.") print("Applying Non-Linear (Elastic) Map.")
# 4. Apply # 5. Application
count = 0 count = 0
for sub in subtitles: for sub in subtitles:
new_start, new_end = sub.start_ms, sub.end_ms new_start, new_end = sub.start_ms, sub.end_ms
@@ -190,7 +180,6 @@ if __name__ == '__main__':
'subtitles_language=English' 'subtitles_language=English'
] ]
# sys.argv = [ # sys.argv = [
# 'sync_script.py', # 'sync_script.py',
# 'episode=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E05/Superman & Lois - S03E05 - Head On Bluray-1080p.mkv', # 'episode=/home/mathieub/Documents/DEV/PycharmProjects/ai-subtitles-sync/test_data/Superman & Lois - S03E05/Superman & Lois - S03E05 - Head On Bluray-1080p.mkv',

View File

@@ -1 +1 @@
git+https://github.com/absadiki/pywhispercpp faster-whisper