feat: initial commit
This commit is contained in:
175
main.py
Normal file
175
main.py
Normal file
@@ -0,0 +1,175 @@
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
EXTENSIONS = {".mkv", ".mp4"}
|
||||
|
||||
# ----------------------------
|
||||
# CLI
|
||||
# ----------------------------
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Convert AAC 5.1 / 7.1 audio to E-AC-3 without touching other streams"
|
||||
)
|
||||
parser.add_argument("path", help="Directory containing media files")
|
||||
parser.add_argument("--ffmpeg", default="ffmpeg", help="Path to ffmpeg binary")
|
||||
parser.add_argument("--overwrite", action="store_true", help="Overwrite original files")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Show what would be done without converting")
|
||||
args = parser.parse_args()
|
||||
|
||||
# ----------------------------
|
||||
# Logging
|
||||
# ----------------------------
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s | %(levelname)-7s | %(message)s",
|
||||
datefmt="%H:%M:%S"
|
||||
)
|
||||
log = logging.getLogger("aac2eac3")
|
||||
|
||||
|
||||
# ----------------------------
|
||||
# Helpers
|
||||
# ----------------------------
|
||||
|
||||
def ffprobe_path():
|
||||
return args.ffmpeg.replace("ffmpeg", "ffprobe")
|
||||
|
||||
|
||||
def probe_streams(file: Path):
|
||||
"""Get info about all streams in a file."""
|
||||
cmd = [
|
||||
ffprobe_path(),
|
||||
"-v", "error",
|
||||
"-show_entries", "stream=index,codec_type,codec_name,channels:stream_tags=language,title",
|
||||
"-of", "json",
|
||||
str(file)
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
data = json.loads(result.stdout)
|
||||
return data.get("streams", [])
|
||||
|
||||
|
||||
def build_output_name(file: Path):
|
||||
if args.overwrite:
|
||||
return file.with_suffix(".tmp" + file.suffix)
|
||||
return file.with_name(f"{file.stem}.eac{file.suffix}")
|
||||
|
||||
|
||||
# ----------------------------
|
||||
# Conversion
|
||||
# ----------------------------
|
||||
|
||||
def convert(file: Path):
|
||||
streams = probe_streams(file)
|
||||
if not streams:
|
||||
log.warning("No streams found: %s", file.name)
|
||||
return
|
||||
|
||||
# Build FFmpeg codec maps
|
||||
codec_map = {}
|
||||
target_bitrates = {}
|
||||
has_aac_multichannel = False
|
||||
|
||||
for s in streams:
|
||||
if s["codec_type"] == "audio":
|
||||
# Only re-encode AAC 5.1 or 7.1
|
||||
if s["codec_name"] == "aac" and s.get("channels") in (6, 8):
|
||||
codec_map[s["index"]] = "eac3"
|
||||
target_bitrates[s["index"]] = "640k" if s["channels"] == 6 else "1024k"
|
||||
has_aac_multichannel = True
|
||||
else:
|
||||
codec_map[s["index"]] = "copy"
|
||||
elif s["codec_type"] == "video":
|
||||
codec_map[s["index"]] = "copy"
|
||||
elif s["codec_type"] == "subtitle":
|
||||
codec_map[s["index"]] = "copy"
|
||||
|
||||
if not has_aac_multichannel:
|
||||
log.info("Skipping %s (no AAC 5.1/7.1 tracks)", file.name)
|
||||
return
|
||||
|
||||
output = build_output_name(file)
|
||||
|
||||
if args.dry_run:
|
||||
log.info("[DRY RUN] Would convert %s → %s", file.name, output.name)
|
||||
for s in streams:
|
||||
if s["codec_type"] == "audio" and s["index"] in codec_map:
|
||||
action = codec_map[s["index"]]
|
||||
if action == "eac3":
|
||||
log.info(" [DRY RUN] Stream %d: %s %dch → E-AC-3 @ %s",
|
||||
s["index"], s["codec_name"], s.get("channels", 0),
|
||||
target_bitrates[s["index"]])
|
||||
else:
|
||||
log.info(" [DRY RUN] Stream %d: %s (copy)",
|
||||
s["index"], s["codec_name"])
|
||||
return
|
||||
|
||||
cmd = [args.ffmpeg, "-y", "-hide_banner", "-loglevel", "info", "-stats", "-i", str(file)]
|
||||
|
||||
# Map all streams
|
||||
cmd += ["-map", "0"]
|
||||
|
||||
# Set codecs
|
||||
for idx, c in codec_map.items():
|
||||
cmd += [f"-c:{idx}", c]
|
||||
if c == "eac3":
|
||||
cmd += [f"-b:a:{idx}", target_bitrates[idx]]
|
||||
|
||||
# Preserve metadata
|
||||
cmd += ["-map_metadata", "0", str(output)]
|
||||
|
||||
log.info("Converting %s → %s", file.name, output.name)
|
||||
|
||||
process = subprocess.Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
|
||||
process.wait()
|
||||
|
||||
if process.returncode != 0:
|
||||
log.error("FFmpeg failed: %s", file.name)
|
||||
if output.exists():
|
||||
output.unlink(missing_ok=True)
|
||||
return
|
||||
|
||||
if args.overwrite:
|
||||
backup = file.with_suffix(file.suffix + ".bak")
|
||||
log.info("Replacing original file")
|
||||
shutil.move(file, backup)
|
||||
shutil.move(output, file)
|
||||
backup.unlink(missing_ok=True)
|
||||
|
||||
log.info("Completed: %s", file.name)
|
||||
|
||||
|
||||
# ----------------------------
|
||||
# Main
|
||||
# ----------------------------
|
||||
|
||||
def main():
|
||||
root = Path(args.path).resolve()
|
||||
if not root.is_dir():
|
||||
raise SystemExit(f"Invalid directory: {root}")
|
||||
|
||||
files = sorted(f for f in root.rglob("*") if f.suffix.lower() in EXTENSIONS)
|
||||
if not files:
|
||||
log.warning("No media files found")
|
||||
return
|
||||
|
||||
log.info("Found %d files", len(files))
|
||||
if args.dry_run:
|
||||
log.info("=== DRY RUN MODE - No files will be modified ===")
|
||||
|
||||
for file in files:
|
||||
try:
|
||||
convert(file)
|
||||
except KeyboardInterrupt:
|
||||
log.warning("Interrupted by user")
|
||||
sys.exit(130)
|
||||
except Exception:
|
||||
log.exception("Unexpected error while processing %s", file.name)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user