176 lines
5.1 KiB
Python
176 lines
5.1 KiB
Python
import argparse
|
|
import json
|
|
import logging
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
EXTENSIONS = {".mkv", ".mp4"}
|
|
|
|
# ----------------------------
|
|
# CLI
|
|
# ----------------------------
|
|
parser = argparse.ArgumentParser(
|
|
description="Convert AAC 5.1 / 7.1 audio to E-AC-3 without touching other streams"
|
|
)
|
|
parser.add_argument("path", help="Directory containing media files")
|
|
parser.add_argument("--ffmpeg", default="ffmpeg", help="Path to ffmpeg binary")
|
|
parser.add_argument("--overwrite", action="store_true", help="Overwrite original files")
|
|
parser.add_argument("--dry-run", action="store_true", help="Show what would be done without converting")
|
|
args = parser.parse_args()
|
|
|
|
# ----------------------------
|
|
# Logging
|
|
# ----------------------------
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s | %(levelname)-7s | %(message)s",
|
|
datefmt="%H:%M:%S"
|
|
)
|
|
log = logging.getLogger("aac2eac3")
|
|
|
|
|
|
# ----------------------------
|
|
# Helpers
|
|
# ----------------------------
|
|
|
|
def ffprobe_path():
|
|
ffmpeg = Path(args.ffmpeg)
|
|
return str(ffmpeg.parent / ffmpeg.name.replace("ffmpeg", "ffprobe"))
|
|
|
|
|
|
def probe_streams(file: Path):
|
|
"""Get info about all streams in a file."""
|
|
cmd = [
|
|
ffprobe_path(),
|
|
"-v", "error",
|
|
"-show_entries", "stream=index,codec_type,codec_name,channels:stream_tags=language,title",
|
|
"-of", "json",
|
|
str(file)
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
data = json.loads(result.stdout)
|
|
return data.get("streams", [])
|
|
|
|
|
|
def build_output_name(file: Path):
|
|
if args.overwrite:
|
|
return file.with_suffix(".tmp" + file.suffix)
|
|
return file.with_name(f"{file.stem}.eac{file.suffix}")
|
|
|
|
|
|
# ----------------------------
|
|
# Conversion
|
|
# ----------------------------
|
|
|
|
def convert(file: Path):
|
|
streams = probe_streams(file)
|
|
if not streams:
|
|
log.warning("No streams found: %s", file.name)
|
|
return
|
|
|
|
# Build FFmpeg codec maps
|
|
codec_map = {}
|
|
target_bitrates = {}
|
|
has_aac_multichannel = False
|
|
|
|
for s in streams:
|
|
if s["codec_type"] == "audio":
|
|
# Only re-encode AAC 5.1 or 7.1
|
|
if s["codec_name"] == "aac" and s.get("channels") in (6, 8):
|
|
codec_map[s["index"]] = "eac3"
|
|
target_bitrates[s["index"]] = "640k"
|
|
has_aac_multichannel = True
|
|
else:
|
|
codec_map[s["index"]] = "copy"
|
|
elif s["codec_type"] == "video":
|
|
codec_map[s["index"]] = "copy"
|
|
elif s["codec_type"] == "subtitle":
|
|
codec_map[s["index"]] = "copy"
|
|
|
|
if not has_aac_multichannel:
|
|
log.info("Skipping %s (no AAC 5.1/7.1 tracks)", file.name)
|
|
return
|
|
|
|
output = build_output_name(file)
|
|
|
|
if args.dry_run:
|
|
log.info("[DRY RUN] Would convert %s → %s", file.name, output.name)
|
|
for s in streams:
|
|
if s["codec_type"] == "audio" and s["index"] in codec_map:
|
|
action = codec_map[s["index"]]
|
|
if action == "eac3":
|
|
log.info(" [DRY RUN] Stream %d: %s %dch → E-AC-3 @ %s",
|
|
s["index"], s["codec_name"], s.get("channels", 0),
|
|
target_bitrates[s["index"]])
|
|
else:
|
|
log.info(" [DRY RUN] Stream %d: %s (copy)",
|
|
s["index"], s["codec_name"])
|
|
return
|
|
|
|
cmd = [args.ffmpeg, "-y", "-hide_banner", "-loglevel", "info", "-stats", "-i", str(file)]
|
|
|
|
# Map all streams
|
|
cmd += ["-map", "0"]
|
|
|
|
# Set codecs
|
|
for idx, c in codec_map.items():
|
|
cmd += [f"-c:{idx}", c]
|
|
if c == "eac3":
|
|
cmd += [f"-b:a:{idx}", target_bitrates[idx]]
|
|
|
|
# Preserve metadata
|
|
cmd += ["-map_metadata", "0", str(output)]
|
|
|
|
log.info("Converting %s → %s", file.name, output.name)
|
|
|
|
process = subprocess.Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
|
|
process.wait()
|
|
|
|
if process.returncode != 0:
|
|
log.error("FFmpeg failed: %s", file.name)
|
|
if output.exists():
|
|
output.unlink(missing_ok=True)
|
|
return
|
|
|
|
if args.overwrite:
|
|
backup = file.with_suffix(file.suffix + ".bak")
|
|
log.info("Replacing original file")
|
|
shutil.move(file, backup)
|
|
shutil.move(output, file)
|
|
backup.unlink(missing_ok=True)
|
|
|
|
log.info("Completed: %s", file.name)
|
|
|
|
|
|
# ----------------------------
|
|
# Main
|
|
# ----------------------------
|
|
|
|
def main():
|
|
root = Path(args.path).resolve()
|
|
if not root.is_dir():
|
|
raise SystemExit(f"Invalid directory: {root}")
|
|
|
|
files = sorted(f for f in root.rglob("*") if f.suffix.lower() in EXTENSIONS)
|
|
if not files:
|
|
log.warning("No media files found")
|
|
return
|
|
|
|
log.info("Found %d files", len(files))
|
|
if args.dry_run:
|
|
log.info("=== DRY RUN MODE - No files will be modified ===")
|
|
|
|
for file in files:
|
|
try:
|
|
convert(file)
|
|
except KeyboardInterrupt:
|
|
log.warning("Interrupted by user")
|
|
sys.exit(130)
|
|
except Exception:
|
|
log.exception("Unexpected error while processing %s", file.name)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |