import os import shutil import subprocess from pathlib import Path from typing import Union, List, Optional import psutil from core import utils, screen from core.vars import logger, PYTHON_EXEC from ui import choices class Stack: def __init__(self, name: str, id: str, port: int, url: str): """ Initialize the Stack instance. :param name: The name of the stack :param id: A unique identifier for the stack :param port: The port the stack service uses :param url: The URL associated with the stack """ self.name = name self.id = id self.path = Path.home() / ".ai-suite-rocm" / id self.url = url self.port = port self.pid_file = self.path / f".pid" self.pid = self.read_pid() def read_pid(self) -> Optional[int]: """ Read the PID from the PID file, if it exists. :return: The PID as an integer, or None if the file does not exist or an error occurs. """ if self.pid_file.exists(): return int(self.pid_file.read_text()) else: return None def write_pid(self, pid: int) -> None: """ Write the PID to the PID file. :param pid: Process ID to write """ with self.pid_file.open('w') as f: f.write(str(pid)) def remove_pid_file(self) -> None: """Remove the PID file if it exists.""" if self.pid_file.exists(): self.pid_file.unlink() def install(self) -> None: """Install the stack, creating the virtual environment and performing initial setup.""" if self.is_installed(): self.update() else: self.check_for_broken_install() self.create_dir('') self.create_venv() self._install() self.create_file('.installed', 'true') logger.info(f"Installed {self.name}") def _install(self) -> None: """Additional installation steps specific to the stack (override as needed).""" pass def is_installed(self) -> bool: """ Check if the stack is installed by verifying the existence of the '.installed' file. :return: True if the stack is installed, False otherwise. """ return self.file_exists('.installed') def check_for_broken_install(self) -> None: """Check for a broken installation and clean up any leftover files.""" if not self.is_installed() and self.path.exists(): logger.warning("Found files from a previous/broken/crashed installation, cleaning up...") self.remove_dir('') def update(self, folder: str = 'webui') -> None: """Update the stack by pulling the latest changes from the repository.""" if self.is_installed(): was_running = self.status() if was_running: self.stop() logger.info(f"Updating {self.name}") symlinks = utils.find_symlink_in_folder(self.path) self.git_pull(folder) self._update() utils.create_symlinks(symlinks) if was_running: self.start() else: logger.warning(f"Could not update {self.name} as {self.name} is not installed") def _update(self) -> None: """Additional update steps specific to the stack (override as needed).""" pass def uninstall(self) -> None: """Uninstall the stack by stopping it and removing its files.""" logger.info(f"Uninstalling {self.name}") if self.status(): self.stop() self.bash(f"rm -rf {self.path}") self.remove_pid_file() def start(self) -> None: """Start the stack service.""" if self.status(): logger.warning(f"{self.name} is already running") return if self.is_installed(): self._start() else: logger.error(f"{self.name} is not installed") def _start(self) -> None: """Additional start steps specific to the stack (override as needed).""" pass def stop(self) -> None: """Stop the stack service by terminating the associated process.""" if self.status(): logger.debug(f"Stopping {self.name} with PID: {self.pid}") try: proc = psutil.Process(self.pid) proc.terminate() # Graceful shutdown proc.wait(timeout=5) except (psutil.NoSuchProcess, psutil.TimeoutExpired): logger.warning(f"{self.name} did not terminate gracefully, forcing kill") psutil.Process(self.pid).kill() self.remove_pid_file() else: logger.warning(f"{self.name} is not running") def restart(self) -> None: """Restart the stack service.""" self.stop() self.start() def status(self) -> bool: """ Check if the stack service is running. :return: True if the service is running, False otherwise. """ return self.pid is not None and psutil.pid_exists(self.pid) def create_venv(self) -> None: """Create a Python virtual environment for the stack.""" venv_path = self.path / 'venv' if not self.has_venv(): logger.info(f"Creating venv for {self.name}") self.bash(f"{PYTHON_EXEC} -m venv {venv_path} --system-site-packages") self.pip_install("pip") else: logger.debug(f"Venv already exists for {self.name}") def has_venv(self) -> bool: """ Check if the virtual environment exists. :return: True if the virtual environment exists, False otherwise. """ return (self.path / 'venv').exists() def pip_install(self, package: Union[str, List[str]], no_deps: bool = False, env: List[str] = [], args: List[str] = []) -> None: """Install a Python package or list of packages using pip.""" if no_deps: args.append("--no-deps") if isinstance(package, list): for p in package: logger.info(f"Installing {p}") self.pip(f"install -U {p}", env=env, args=args) else: logger.info(f"Installing {package}") self.pip(f"install -U {package}", env=env, args=args) def install_requirements(self, filename: str = 'requirements.txt', env: List[str] = []) -> None: """Install requirements from a given file.""" logger.info(f"Installing requirements for {self.name} ({filename})") self.pip(f"install -r {filename}", env=env) def pip(self, cmd: str, env: List[str] = [], args: List[str] = [], current_dir: Optional[Path] = None) -> None: """Run pip with a given command.""" self.python(f"-m pip {cmd}", env=env, args=args, current_dir=current_dir) def python(self, cmd: str, env: List[str] = [], args: List[str] = [], current_dir: Optional[Path] = None, daemon: bool = False) -> None: """Run a Python command inside the stack's virtual environment.""" self.bash(f"{' '.join(env)} {self.path / 'venv' / 'bin' / 'python'} {cmd} {' '.join(args)}", current_dir, daemon) def bash(self, cmd: str, current_dir: Optional[Path] = None, daemon: bool = False) -> None: """Run a bash command, optionally as a daemon.""" full_cmd = f"cd {current_dir or self.path} && {cmd}" if daemon: if self.status(): choice = choices.already_running.ask() if choice is True: self.stop() self._start() return else: return else: logger.debug(f"Running command as daemon: {full_cmd}") # process = subprocess.Popen(full_cmd, shell=True, preexec_fn=os.setpgrp, # stdout=config.open_file(f"{self.id}-stdout"), # stderr=config.open_file(f"{self.id}-stderr")) screen_session = screen.create(name=self.id) screen_session.send(f"'{full_cmd}'") self.write_pid(screen_session.pid) return else: logger.debug(f"Running command: {full_cmd}") process = subprocess.Popen(full_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = process.communicate() if process.returncode != 0: logger.fatal(f"Failed to run command: {full_cmd}") logger.fatal(f"Error: {err.decode('utf-8')}") logger.fatal(f"Output: {out.decode('utf-8')}") raise Exception(f"Failed to run command: {full_cmd}") def git_clone(self, url: str, branch: Optional[str] = None, dest: Optional[Path] = None) -> None: """Clone a git repository.""" logger.info(f"Cloning {url}") self.bash(f"git clone {f'-b {branch}' if branch else ''} {url} {dest or ''}") def git_pull(self, repo_folder: str, force: bool = False) -> None: """Pull changes from a git repository.""" self.bash(f"git reset --hard HEAD {'&& git clean -f -d' if force else ''} && git pull", Path(repo_folder)) def install_from_prebuilt(self, name): for prebuilt in utils.get_prebuilts(): if prebuilt['name'].split("-")[0] == name: self.pip(f"install {prebuilt['browser_download_url']}") return def create_file(self, name: str, content: str) -> None: """Create a file with the given content.""" (self.path / name).write_text(content) def create_dir(self, name: str) -> None: """Create a directory.""" dir_path = self.path / name logger.debug(f"Creating directory {dir_path}") dir_path.mkdir(parents=True, exist_ok=True) def remove_file(self, name: str) -> None: """Remove a file.""" logger.debug(f"Removing file {name}") os.remove(os.path.join(self.path, name)) def remove_dir(self, name: str) -> None: """Remove a directory.""" logger.debug(f"Removing directory {name or self.path}") if not name: shutil.rmtree(self.path) else: shutil.rmtree(os.path.join(self.path, name)) def move_file_or_dir(self, src: str, dest: str) -> None: """Move a file or directory.""" logger.debug(f"Moving file/dir {src} to {dest}") os.rename(os.path.join(self.path, src), os.path.join(self.path, dest)) def move_all_files_in_dir(self, src: str, dest: str) -> None: """Move all files in a directory to another directory""" logger.debug(f"Moving all files in directory {src} to {dest}") for file in os.listdir(os.path.join(self.path, src)): os.rename(os.path.join(self.path, src, file), os.path.join(self.path, dest, file)) def file_exists(self, name: str) -> bool: """Check if a file exists.""" return (self.path / name).exists() def dir_exists(self, name: str) -> bool: """Check if a directory exists.""" return (self.path / name).exists() def remove_line_in_file(self, contains: str | list, file: str): """Remove lines containing a specific string from a file.""" target_file = self.path / file logger.debug(f"Removing lines containing {contains} in {target_file}") if isinstance(contains, list): for c in contains: self.bash(f"sed -i '/{c}/d' {target_file}") else: self.bash(f"sed -i '/{contains}/d' {target_file}") def replace_line_in_file(self, match: str, replace: str, file: str): """Replace lines containing a specific string in a file.""" target_file = self.path / file logger.debug(f"Replacing lines containing {match} with {replace} in {target_file}") self.bash(f"sed -i 's/{match}/{replace}/g' {target_file}")