import argparse import json import logging import shutil import subprocess import sys import time from pathlib import Path EXTENSIONS = {".mkv", ".mp4"} # ---------------------------- # CLI # ---------------------------- parser = argparse.ArgumentParser( description="Convert AAC 5.1 / 7.1 audio to E-AC-3 without touching other streams" ) parser.add_argument("path", help="Directory containing media files") parser.add_argument("--ffmpeg", default="ffmpeg", help="Path to ffmpeg binary") parser.add_argument("--overwrite", action="store_true", help="Overwrite original files") parser.add_argument("--dry-run", action="store_true", help="Show what would be done without converting") args = parser.parse_args() # ---------------------------- # Logging # ---------------------------- logging.basicConfig( level=logging.INFO, format="%(message)s", datefmt="%H:%M:%S" ) log = logging.getLogger("aac2eac3") # ---------------------------- # Colors & Formatting # ---------------------------- class Colors: HEADER = '\033[95m' BLUE = '\033[94m' CYAN = '\033[96m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' DIM = '\033[2m' RESET = '\033[0m' def header(text): log.info(f"\n{Colors.BOLD}{Colors.CYAN}{'═' * 70}{Colors.RESET}") log.info(f"{Colors.BOLD}{Colors.CYAN} {text}{Colors.RESET}") log.info(f"{Colors.BOLD}{Colors.CYAN}{'═' * 70}{Colors.RESET}\n") def section(text): log.info(f"\n{Colors.BOLD}{Colors.BLUE}▶ {text}{Colors.RESET}") def success(text): log.info(f"{Colors.GREEN}✓ {text}{Colors.RESET}") def info(text, indent=0): prefix = " " * indent log.info(f"{prefix}{Colors.DIM}{text}{Colors.RESET}") def warning(text): log.info(f"{Colors.YELLOW}⚠ {text}{Colors.RESET}") def error(text): log.info(f"{Colors.RED}✗ {text}{Colors.RESET}") def format_size(bytes): for unit in ['B', 'KB', 'MB', 'GB']: if bytes < 1024.0: return f"{bytes:.1f} {unit}" bytes /= 1024.0 return f"{bytes:.1f} TB" def format_time(seconds): if seconds < 60: return f"{seconds:.0f}s" elif seconds < 3600: return f"{seconds // 60:.0f}m {seconds % 60:.0f}s" else: return f"{seconds // 3600:.0f}h {(seconds % 3600) // 60:.0f}m" # ---------------------------- # Stats Tracking # ---------------------------- class Stats: def __init__(self): self.total_files = 0 self.processed = 0 self.skipped = 0 self.converted = 0 self.failed = 0 self.start_time = time.time() self.conversion_times = [] def print_summary(self): elapsed = time.time() - self.start_time header("Conversion Summary") log.info(f" {Colors.BOLD}Total files scanned:{Colors.RESET} {self.total_files}") log.info(f" {Colors.GREEN}{Colors.BOLD}Successfully converted:{Colors.RESET} {self.converted}") log.info(f" {Colors.YELLOW}{Colors.BOLD}Skipped (no AAC):{Colors.RESET} {self.skipped}") if self.failed > 0: log.info(f" {Colors.RED}{Colors.BOLD}Failed:{Colors.RESET} {self.failed}") log.info(f"\n {Colors.BOLD}Total time:{Colors.RESET} {format_time(elapsed)}") if self.conversion_times: avg_time = sum(self.conversion_times) / len(self.conversion_times) log.info(f" {Colors.BOLD}Average per file:{Colors.RESET} {format_time(avg_time)}") log.info("") stats = Stats() # ---------------------------- # Helpers # ---------------------------- def ffprobe_path(): ffmpeg = Path(args.ffmpeg) return str(ffmpeg.parent / ffmpeg.name.replace("ffmpeg", "ffprobe")) def probe_streams(file: Path): """Get info about all streams in a file.""" cmd = [ ffprobe_path(), "-v", "error", "-show_entries", "stream=index,codec_type,codec_name,channels:stream_tags=language,title", "-of", "json", str(file) ] result = subprocess.run(cmd, capture_output=True, text=True) data = json.loads(result.stdout) return data.get("streams", []) def build_output_name(file: Path): if args.overwrite: return file.with_suffix(".tmp" + file.suffix) return file.with_name(f"{file.stem}.eac{file.suffix}") # ---------------------------- # Conversion # ---------------------------- def convert(file: Path): stats.processed += 1 section(f"[{stats.processed}/{stats.total_files}] {file.name}") streams = probe_streams(file) if not streams: warning("No streams found") stats.skipped += 1 return # Build FFmpeg codec maps codec_map = {} target_bitrates = {} aac_tracks = [] for s in streams: if s["codec_type"] == "audio": # Only re-encode AAC 5.1 or 7.1 if s["codec_name"] == "aac" and s.get("channels") in (6, 8): codec_map[s["index"]] = "eac3" target_bitrates[s["index"]] = "640k" aac_tracks.append(s) else: codec_map[s["index"]] = "copy" elif s["codec_type"] == "video": codec_map[s["index"]] = "copy" elif s["codec_type"] == "subtitle": codec_map[s["index"]] = "copy" if not aac_tracks: info("No AAC 5.1/7.1 tracks found - skipping", 1) stats.skipped += 1 return # Show what will be converted for s in aac_tracks: channels = s.get("channels", 0) lang = s.get("tags", {}).get("language", "und") title = s.get("tags", {}).get("title", "") track_info = f"Track {s['index']}: AAC {channels}ch" if lang != "und": track_info += f" ({lang})" if title: track_info += f" - {title}" info(f"{track_info} → E-AC-3 @ 640k", 1) output = build_output_name(file) if args.dry_run: if args.overwrite: info(f"Would replace: {Colors.CYAN}{file.name}{Colors.RESET}", 1) else: info(f"Would create: {Colors.CYAN}{output.name}{Colors.RESET}", 1) stats.converted += 1 return # Build FFmpeg command cmd = [args.ffmpeg, "-y", "-hide_banner", "-loglevel", "error", "-stats", "-i", str(file)] cmd += ["-map", "0"] for idx, c in codec_map.items(): cmd += [f"-c:{idx}", c] if c == "eac3": cmd += [f"-b:a:{idx}", target_bitrates[idx]] cmd += ["-map_metadata", "0", str(output)] info(f"Converting → {Colors.CYAN}{output.name}{Colors.RESET}", 1) start_time = time.time() process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) # Show FFmpeg progress for line in process.stdout: if line.strip(): print(f" {Colors.DIM}{line.rstrip()}{Colors.RESET}") process.wait() elapsed = time.time() - start_time stats.conversion_times.append(elapsed) if process.returncode != 0: error(f"FFmpeg failed (exit code {process.returncode})") stats.failed += 1 if output.exists(): output.unlink(missing_ok=True) return if args.overwrite: backup = file.with_suffix(file.suffix + ".bak") shutil.move(file, backup) shutil.move(output, file) backup.unlink(missing_ok=True) info(f"Original file replaced", 1) # Show file sizes if output.exists() or (args.overwrite and file.exists()): final_file = file if args.overwrite else output size = format_size(final_file.stat().st_size) success(f"Completed in {format_time(elapsed)} ({size})") stats.converted += 1 # ---------------------------- # Main # ---------------------------- def main(): header("AAC to E-AC-3 Converter") root = Path(args.path).resolve() if not root.is_dir(): error(f"Invalid directory: {root}") raise SystemExit(1) info(f"Scanning: {Colors.CYAN}{root}{Colors.RESET}") files = sorted(f for f in root.rglob("*") if f.suffix.lower() in EXTENSIONS) if not files: warning("No media files found") return stats.total_files = len(files) log.info(f"Found {Colors.BOLD}{len(files)}{Colors.RESET} media files") if args.dry_run: log.info(f"\n{Colors.YELLOW}{Colors.BOLD}DRY RUN MODE{Colors.RESET} - No files will be modified\n") if args.overwrite: log.info(f"{Colors.YELLOW}Overwrite mode enabled{Colors.RESET} - Original files will be replaced\n") for file in files: try: convert(file) except KeyboardInterrupt: log.info(f"\n\n{Colors.YELLOW}Interrupted by user{Colors.RESET}") stats.print_summary() sys.exit(130) except Exception as e: error(f"Unexpected error: {e}") stats.failed += 1 stats.print_summary() if __name__ == "__main__": main()