diff --git a/requirements.txt b/requirements.txt index e69de29..370c825 100644 --- a/requirements.txt +++ b/requirements.txt @@ -0,0 +1,2 @@ +fabric~=3.1.0 +patchwork~=1.0.1 \ No newline at end of file diff --git a/run.py b/run.py index 31779b9..d5c0bb8 100644 --- a/run.py +++ b/run.py @@ -4,15 +4,23 @@ import logging from src import main if __name__ == '__main__': - parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser(description="Deploy JSON file as LXCs and VMs to your proxmox server") parser.add_argument("-v", "--verbose", help="increase output verbosity", action="store_true") + parser.add_argument("--host", help="host to use for ssh", type=str) + parser.add_argument("--username", help="username to use for ssh (default: root)", type=str, default="root") + parser.add_argument("--port", help="port to use for ssh (default: 22)", type=str, default=22) + + parser.add_argument("--repo", help="git repo to clone", type=str) + parser.add_argument("path", help="path where your repo is cloned or will be cloned", type=str) args = parser.parse_args() + # Set logging level if args.verbose: logging.basicConfig(format='[%(levelname)s] : %(message)s', level=logging.DEBUG) else: logging.basicConfig(format='[%(levelname)s] : %(message)s', level=logging.INFO) - main.run() + # Run the main program + main.run(args) diff --git a/src/lxc/commands_utils.py b/src/lxc/commands_utils.py deleted file mode 100644 index 2a3b48a..0000000 --- a/src/lxc/commands_utils.py +++ /dev/null @@ -1,411 +0,0 @@ -from __future__ import annotations - -from pathlib import Path -from typing import TYPE_CHECKING - -from ..utils import proxmox_utils -from ..utils.resources_utils import get_path - -if TYPE_CHECKING: - from . import LXC - - -def run_script(lxc: LXC, step: dict): - """Method use to dispatch the script to the correct method depending on the type of script, being - local (on the machine running this program), remote (url) or in the LXC. - - Parameters - ---------- - lxc : LXC - The LXC object used to run the script - step: dict - Dictionary containing the step information and configuration about the script to run - Typically read from the "creation/steps/" in JSON file - """ - - # Install bash if not installed - # Sometimes only ash or sh are installed, which doesn't work for some scripts - if not lxc.has_program("bash"): - install_package(lxc, "bash") - - # Run local script - if "local_path" in step: - path = get_path(lxc, step["local_path"]) - _run_local_script_on_lxc(lxc, path) - - # Run remote script - elif "url" in step: - _run_remote_script_on_lxc(lxc, step["url"]) - - # Run script in LXC - elif "lxc_path" in step: - path = get_path(lxc, step["lxc_path"]) - _run_script_on_lxc(lxc, path) - - -def _run_script_on_lxc(lxc: LXC, path: Path): - """Runs a script present inside the LXC storage, without copying or downloading it - - Parameters - ---------- - lxc: LXC - The LXC object used to run the script - path: pathlib.Path - Path to the script inside the LXC storage - """ - - lxc.run_command(f"bash {str(path.as_posix())}", shell=True) - - -def _run_local_script_on_lxc(lxc: LXC, path: Path): - """Runs a script present on the local machine running this program, inside the LXC - It works by copying the script from the local machine to the PVE using SCP and - then using "pct push" to send it to the LXC. - - Parameters - ---------- - lxc: LXC - The LXC object used to run the script - path: pathlib.Path - Path object to the script on the local machine (machine running this program) - """ - - # define the path of the script inside the LXC - script_path_in_lxc = Path("/tmp/pdj-scripts/").joinpath(path.name) - - # Copy the script to the LXC - proxmox_utils.copy_file_to_lxc(lxc, path, f"{str(script_path_in_lxc.parent.as_posix())}/") - - # and run it from there - _run_script_on_lxc(lxc, script_path_in_lxc) - - # once it has been run, remove the script from the LXC - remove_file_in_lxc(lxc, script_path_in_lxc) - - # with open(path, "r") as file: - # script = file.read() - # - # # TODO : Might work with EOF instead of copying the script to /tmp but unable to make it work - # # lxc.run_command("bash <>> run_docker_command(lxc, "", "") - """ - - lxc.run_command(f"docker exec -it {container} {command}", exception_on_empty_stdout=False) - - -def run_docker_compose_command(lxc: LXC, command: str, working_directory: str = None): - """Run a docker-compose command in the LXC - - Parameters - ---------- - lxc: LXC - The LXC object of the LXC to run the docker-compose command in - command: str - The docker-compose command to run - working_directory: str, optional - The working directory to run the command in - - Examples - -------- - >>> run_docker_compose_command(lxc, "up -d", "/home/user/traefik") - """ - - docker_compose_exec = "docker-compose" - if not lxc.has_program(docker_compose_exec): - docker_compose_exec = "docker compose" - - if working_directory is not None: - lxc.run_command(f"cd {working_directory} && {docker_compose_exec} {command}", return_status_code=True) - else: - lxc.run_command(f"{docker_compose_exec} {command}", return_status_code=True) - - -def download_file(lxc: LXC, url: str, destination: str): - """Download a file from a URL to the LXC and save it to the destination - - Parameters - ---------- - lxc: LXC - The LXC object of the LXC to download the file to - url: str - URL of the file to download - destination: str - Path to the destination to save the file to - Needs to end with a trailing slash - - Examples - -------- - >>> download_file(lxc, "https://example.com/file.zip", "/home/user/") - - """ - """Download a file from a URL to the LXC""" - if type(url) is list: - for u in url: - lxc.run_command(f"wget {u} --directory-prefix={destination}", return_status_code=True) - else: - lxc.run_command(f"wget {url} --directory-prefix={destination}", return_status_code=True) - - -def unzip_file(lxc: LXC, path: str, destination: str = None): - """Unzip a file in the LXC - - Parameters - ---------- - lxc: LXC - The LXC object of the LXC to unzip the file in - path: str - Path to the file to unzip - destination: str, optional - Path to the destination to unzip the file to - If not specified, it will unzip the file in the same directory as the file - Needs to end with a trailing slash - - Examples - -------- - >>> unzip_file(lxc, "/home/user/file.zip", "/home/user/extracted_files/") - >>> unzip_file(lxc, "/home/user/file.tar.gz", "/home/user/extracted_files/") - """ - - if destination is None: - destination = Path(path).parent - - if ".zip" in path: - lxc.run_command(f"unzip {path} -d {destination}", return_status_code=True) - elif ".tar.gz" in path: - lxc.run_command(f"mkdir -p {destination} && tar -xzf {path} --directory {destination}", return_status_code=True) - - -def install_package(lxc: LXC, package: str or list): - """Install a package in the LXC - - Parameters - ---------- - lxc: LXC - The LXC object of the LXC to install the package in - package: str or list - Name(s) of the package(s) to install - - Examples - -------- - >>> install_package(lxc, "nginx") - >>> install_package(lxc, ["nginx", "apache2"]) - """ - - if type(package) is list: - for p in package: - lxc.run_command(f"{proxmox_utils.get_install_package_command(lxc.get_os_name())} {p}", - return_status_code=True) - else: - lxc.run_command(f"{proxmox_utils.get_install_package_command(lxc.get_os_name())} {package}", - return_status_code=True) - - -def remove_package(lxc: LXC, package: str or list): - """Remove a package in the LXC - - Parameters - ---------- - lxc: LXC - The LXC object of the LXC to remove the package in - package: str or list - Name(s) of the package(s) to remove - - Examples - -------- - >>> remove_package(lxc, "nginx") - >>> remove_package(lxc, ["nginx", "apache2"]) - """ - - if type(package) is list: - packages = [] - for p in package: - packages.append(p) - lxc.run_command(f"{proxmox_utils.get_remove_package_command(lxc.get_os_name())} {' '.join(packages)}", - return_status_code=True) - - else: - lxc.run_command(f"{proxmox_utils.get_remove_package_command(lxc.get_os_name())} {package}", - return_status_code=True) - - -def replace_in_files(lxc: LXC, path: str or list, search: str, replace: str, case_sensitive: bool = False): - """Replace a string in one or multiples files in the LXC - - Parameters - ---------- - lxc : LXC - The LXC object of the LXC to replace the string in - path : str or list of str - Path to the file(s) to replace the string in - search : str - String to search for - replace : str - String to replace the search string with - case_sensitive : bool, optional - Whether the search should be case sensitive or not - - Examples - -------- - >>> replace_in_files(lxc, "/home/user/file.txt", "username=root", "username=administrator" - >>> replace_in_files(lxc, ["/home/user/file1.txt", "/home/user/file2.txt"], \ - "username=root", "username=administrator", case_sensitive=True) - - """ - - if type(path) is list: - for p in path: - lxc.run_command(f"sed {'-i' if case_sensitive else ''} 's/{search}/{replace}/g' {p}", - return_status_code=True) - else: - lxc.run_command(f"sed {'-i' if case_sensitive else ''} 's/{search}/{replace}/g' {path}", - return_status_code=True) diff --git a/src/lxc/creation_utils.py b/src/lxc/creation_utils.py index 3638b32..051950a 100644 --- a/src/lxc/creation_utils.py +++ b/src/lxc/creation_utils.py @@ -4,11 +4,11 @@ import logging from pathlib import Path from typing import TYPE_CHECKING -from . import commands_utils +from ..utils import commands_utils from ..utils.resources_utils import get_path if TYPE_CHECKING: - from . import LXC + from .lxc import LXC def are_all_conditions_met(lxc: LXC): diff --git a/src/lxc/__init__.py b/src/lxc/lxc.py similarity index 53% rename from src/lxc/__init__.py rename to src/lxc/lxc.py index a45ff8f..dd6f0bc 100644 --- a/src/lxc/__init__.py +++ b/src/lxc/lxc.py @@ -1,15 +1,18 @@ import logging -from pathlib import Path -from . import creation_utils, commands_utils -from ..utils import proxmox_utils +from . import creation_utils, lxc_utils +from ..utils import commands_utils +from ..utils.machine import LinuxMachine +from ..utils.proxmox import ProxmoxHost -class LXC: +class LXC(LinuxMachine): """LXC object""" def __init__(self, lxc_id: int, lxc_hostname: str, os: dict, resources: dict, network: dict, options: dict, - creation: dict, deploy: dict): + creation: dict, deploy: dict, proxmox_host: ProxmoxHost): + super().__init__() + self.lxc_id = lxc_id self.lxc_hostname = lxc_hostname @@ -36,11 +39,14 @@ class LXC: self.options = options self.privileged = options["privileged"] self.start_on_boot = options["start_on_boot"] + self.startup_order = options["startup_order"] self.password = options["password"] self.creation = creation self.deploy = deploy + self.pve = proxmox_host + def __str__(self): return f"LXC {self.lxc_id} ({self.lxc_hostname})" @@ -99,14 +105,14 @@ class LXC: # TODO: might have to run "pveam update" before running this command on fresh install of PVE - template_name = proxmox_utils.run_command_on_pve( + template_name = self.pve.run_command( f"pveam available --section system | awk /\'{self.os_name}-{self.os_release}/\' | awk \'{{print $2}}\'") - is_template_downloaded = proxmox_utils.run_command_on_pve(command=f"pveam list local | awk /'{template_name}/'") + is_template_downloaded = self.pve.run_command(command=f"pveam list local | awk /'{template_name}/'") if is_template_downloaded == "": logging.info(f"Template {template_name} not found, downloading it...") - proxmox_utils.run_command_on_pve(command=f"pveam download local {template_name}") + self.pve.run_command(command=f"pveam download local {template_name}") return f"local:vztmpl/{template_name}" @@ -173,11 +179,7 @@ class LXC: """ if self.ipv4 == "dhcp": if self.is_running(): - if self.has_program("ip"): - return self.run_command( - command="ip addr | grep \'state UP\' -A2 | tail -n1 | awk \'{{print $2}}\' | cut -f1 -d\'/\'") - elif self.has_program("ifconfig"): - return self.run_command(command="ifconfig eth0 | awk '/inet addr/{print substr($2,6)}'") + return super().get_ipv4() return self.ipv4 @@ -237,6 +239,13 @@ class LXC: """ return self.start_on_boot + def get_startup_order(self): + """ + Get startup order + :return: startup order + """ + return self.startup_order + def get_password(self): """ Get password @@ -267,39 +276,33 @@ class LXC: Is running :return: is lxc running? (boolean) """ - return proxmox_utils.run_command_on_pve(command=f"pct list | awk '/running/ && /{self.lxc_id}/'") != "" + return self.pve.is_lxc_running(self.lxc_id) def does_exist(self): """ Does exist :return: does lxc exist? (boolean) """ - return proxmox_utils.does_lxc_exist(self.lxc_id) + return self.pve.does_lxc_exist(self.lxc_id) def start(self): """ Start LXC """ - if self.is_running(): - logging.info(f"LXC {self.lxc_id} already running, skipping start") - return - else: - logging.info(f"Starting LXC {self.lxc_id}") - proxmox_utils.run_command_on_pve(command=f"pct start {self.lxc_id}") + self.pve.start_lxc(self.lxc_id) def stop(self): """ Stop LXC """ - logging.info(f"Stopping LXC {self.lxc_id}") - proxmox_utils.run_command_on_pve(command=f"pct stop {self.lxc_id}") + self.pve.stop_lxc(self.lxc_id) def reboot(self): """ Reboot LXC """ logging.info(f"Rebooting LXC {self.lxc_id}") - proxmox_utils.run_command_on_pve(command=f"pct reboot {self.lxc_id}") + self.pve.reboot_lxc(self.lxc_id) def create(self): """ @@ -307,14 +310,15 @@ class LXC: """ if self.does_exist(): logging.info(f"LXC {self.lxc_id} already exists, skipping creation") - proxmox_utils.run_command_on_pve(command=self.get_pct_command(create=False)) + self.pve.run_command(command=lxc_utils.generate_pct_command_for_lxc(self, create=False)) else: logging.info(f"Creating LXC {self.lxc_id}") - proxmox_utils.run_command_on_pve(command=self.get_pct_command(create=True)) + self.pve.run_command(command=lxc_utils.generate_pct_command_for_lxc(self, create=True)) self.start() logging.info("Setting up SSH for LXC") + # self.run_script(script_path="protected/scripts/install-config-ssh.sh") commands_utils.run_script(lxc=self, step={"local_path": "protected/scripts/install-config-ssh.sh"}) def run_creation(self): @@ -338,43 +342,6 @@ class LXC: def deploy(self): pass - def has_program(self, program): - """ - Check if program is installed on LXC - :param program: program executable name - - :return: boolean - """ - if type(program) == str: - return self.run_command("which " + program, return_status_code=True) == 0 - elif type(program) == list: - return all((self.run_command("which " + p, return_status_code=True) == 0) for p in program) - - def has_file(self, file: str or Path): - """ - Check if file exists on LXC - :param file: file or path - - :return: boolean - """ - if isinstance(file, Path): - file = str(file.as_posix()) - - return self.run_command("test -f " + file, return_status_code=True) == 0 - - def has_directory(self, directory: str or Path): - """ - Check if directory exists on LXC - :param directory: directory path - - :return: boolean - """ - - if isinstance(directory, Path): - directory = str(directory.as_posix()) - - return self.run_command("test -d " + directory, return_status_code=True) == 0 - def run_script(self, script_path): """ Run script on LXC filesystem using bash @@ -384,7 +351,7 @@ class LXC: """ return commands_utils.run_script(self, {"lxc_path": script_path}) - def run_command(self, command: str, ssh: bool = False, return_status_code: bool = False, + def run_command(self, command: str, return_status_code: bool = False, exception_on_exit: bool = False, exception_on_empty_stdout: bool = False, working_directory: str = None, shell: bool = False): @@ -399,84 +366,6 @@ class LXC: if working_directory: command = f"cd {working_directory} && {command}" - if ssh: - return proxmox_utils.run_command_ssh(command=command, host=self.get_ipv4(), username="root", port=22, - return_status_code=return_status_code, - exception_on_exit=exception_on_exit, - exception_on_empty_stdout=exception_on_empty_stdout, - shell=shell) - else: - return proxmox_utils.run_command_on_pve(command=f"pct exec {self.lxc_id} -- {command}", - return_status_code=return_status_code, - exception_on_exit=exception_on_exit, - exception_on_empty_stdout=exception_on_empty_stdout, - shell=shell) - - def get_pct_command(self, create=True): - """ - Get pct command to create/edit LXC - :return: pct command - """ - - if create: - # Create command - pct_command = f"pct create {self.lxc_id} {self.get_os_template()} " \ - f"--hostname {self.lxc_hostname} " \ - f"--cores {self.cpu} " \ - f"--memory {self.memory} " \ - f"--swap {self.memory} " \ - f"--net0 name=eth0,bridge={self.bridge},ip={self.ipv4},hwaddr={self.mac},type=veth " \ - f"--onboot {int(self.start_on_boot == 0)} " \ - f"--ostype {self.os_name} " \ - f"--password {self.password} " \ - f"--storage {self.storage} " \ - f"--unprivileged {not self.privileged} " \ - f"--rootfs volume={self.storage}:{self.disk},size={self.disk} " \ - f"--ssh-public-keys /root/.ssh/id_rsa.pub " \ - f"--unprivileged {not self.privileged}" - else: - # Update command - pct_command = f"pct set {self.lxc_id} " \ - f"--hostname {self.lxc_hostname} " \ - f"--cores {self.cpu} " \ - f"--memory {self.memory} " \ - f"--swap {self.memory} " \ - f"--net0 name=eth0,bridge={self.bridge},ip={self.ipv4},hwaddr={self.mac},type=veth " \ - f"--onboot {int(self.start_on_boot == 0)} " \ - f"--ostype {self.os_name} " - - # TODO: add gateway4 - # f"ip6={self.ipv6},gw6={self.gateway6},trunks={self.vlan} " \ # TODO - return pct_command - - def get_tteck_env_variables(self): - """ - Get TTECK environment variables to run scripts silently - ! Deprecated for now ! - - :return: environment variables - """ - - env_variables = { - "CT_TYPE": "1", - "PW": self.password, - "CT_ID": self.lxc_id, - "HN": self.lxc_hostname, - "DISK_SIZE": self.disk, - "CORE_COUNT": self.cpu, - "RAM_SIZE": self.memory, - "BRG": self.bridge, - "NET": self.ipv4, - "GATE": self.gateway4, - "DISABLEIP6": "no", - "MTU": "", - "SD": "", - "NS": "", - "MAC": self.mac, - "VLAN": self.vlan, - # "SSH": self.ssh, - "VERB": "no" - } - - env_command = " && ".join([f"export {name}=\"{value}\"" for name, value in env_variables.items()]) - return env_command + self.pve.run_command(command=f"pct exec {self.lxc_id} -- {command}", return_status_code=return_status_code, + exception_on_exit=exception_on_exit, + exception_on_empty_stdout=exception_on_empty_stdout) diff --git a/src/lxc/lxc_utils.py b/src/lxc/lxc_utils.py new file mode 100644 index 0000000..113b8c9 --- /dev/null +++ b/src/lxc/lxc_utils.py @@ -0,0 +1,114 @@ +import json +from pathlib import Path + +from .lxc import LXC + +lxcs = [] + + +def get_all_lxcs(): + return lxcs + + +def load_lxc(lxc_id: int, filepath: Path): + """Load LXC from JSON file + + Parameters + ---------- + lxc_id : int + ID of the LXC to load + filepath : pathlib.Path + Path object to the JSON file of the LXC to load + + Examples + -------- + >>> load_lxc(100, Path("/resources/lxc/100/config.json")) + """ + + # Load JSON data + data = json.loads(filepath.read_text()) + + # Extract values from JSON + lxc_id = lxc_id + lxc_hostname = data["lxc_hostname"] + os = data["os"] + resources = data["resources"] + network = data["network"] + options = data["options"] + creation = data["creation"] + deploy = data["deploy"] + + # Create LXC object + lxc = LXC(lxc_id, lxc_hostname, os, resources, network, options, creation, deploy) + lxcs.append(lxc) + + +def get_tteck_env_variables(self): + """ + Get TTECK environment variables to run scripts silently + ! Deprecated for now ! + + :return: environment variables + """ + + env_variables = { + "CT_TYPE": "1", + "PW": self.password, + "CT_ID": self.lxc_id, + "HN": self.lxc_hostname, + "DISK_SIZE": self.disk, + "CORE_COUNT": self.cpu, + "RAM_SIZE": self.memory, + "BRG": self.bridge, + "NET": self.ipv4, + "GATE": self.gateway4, + "DISABLEIP6": "no", + "MTU": "", + "SD": "", + "NS": "", + "MAC": self.mac, + "VLAN": self.vlan, + # "SSH": self.ssh, + "VERB": "no" + } + + env_command = " && ".join([f"export {name}=\"{value}\"" for name, value in env_variables.items()]) + return env_command + + +def generate_pct_command_for_lxc(lxc: LXC, create: bool = True): + """ + Get pct command to create/edit LXC + :return: pct command + """ + + if create: + # Create command + pct_command = f"pct create {lxc.get_id()} {lxc.get_os_template()} " \ + f"--hostname {lxc.get_hostname()} " \ + f"--cores {lxc.get_cpu()} " \ + f"--memory {lxc.get_memory()} " \ + f"--swap {lxc.get_swap()} " \ + f"--net0 name=eth0,bridge={lxc.get_bridge()},ip={lxc.get_ipv4()},hwaddr={lxc.get_mac()},type=veth " \ + f"--onboot {int(lxc.is_start_on_boot())} " \ + f"--ostype {lxc.get_os_name()} " \ + f"--password {lxc.get_password()} " \ + f"--storage {lxc.get_storage()} " \ + f"--unprivileged {not lxc.is_privileged()} " \ + f"--rootfs volume={lxc.get_storage()}:{lxc.get_disk()},size={lxc.get_disk()} " \ + f"--ssh-public-keys /root/.ssh/id_rsa.pub " \ + f"--unprivileged {not lxc.is_privileged()}" + else: + # Update command + pct_command = f"pct set {lxc.get_id()} " \ + f"--hostname {lxc.get_hostname()} " \ + f"--cores {lxc.get_cpu()} " \ + f"--memory {lxc.get_memory()} " \ + f"--swap {lxc.get_memory()} " \ + f"--net0 name=eth0,bridge={lxc.get_bridge()},ip={lxc.get_ipv4()},hwaddr={lxc.get_mac()},type=veth " \ + f"--onboot {int(lxc.is_start_on_boot())} " \ + f"--ostype {lxc.get_os_name()} " + + # TODO: add gateway4 + # f"ip6={self.ipv6},gw6={self.gateway6},trunks={self.vlan} " \ # TODO + return pct_command diff --git a/src/lxc/utils.py b/src/lxc/utils.py deleted file mode 100644 index 2202856..0000000 --- a/src/lxc/utils.py +++ /dev/null @@ -1,72 +0,0 @@ -import json -from pathlib import Path - -from . import LXC - -lxcs = [] - - -def get_all_lxcs(): - """Get all LXC objects - - Returns - ------- - list - List of all loaded LXC objects - """ - - return lxcs - - -def get_lxc(lxc_id: int): - """Get LXC by ID - - Parameters - ---------- - lxc_id : int - ID of the LXC to get - - Returns - ------- - LXC - LXC object - """ - - for lxc in lxcs: - if lxc.get_id() == lxc_id: - return lxc - - return None - - -def load_lxc(lxc_id: int, filepath: Path): - """Load LXC from JSON file - - Parameters - ---------- - lxc_id : int - ID of the LXC to load - filepath : pathlib.Path - Path object to the JSON file of the LXC to load - - Examples - -------- - >>> load_lxc(100, Path("/resources/lxc/100/config.json")) - """ - - # Load JSON data - data = json.loads(filepath.read_text()) - - # Extract values from JSON - lxc_id = lxc_id - lxc_hostname = data["lxc_hostname"] - os = data["os"] - resources = data["resources"] - network = data["network"] - options = data["options"] - creation = data["creation"] - deploy = data["deploy"] - - # Create LXC object - lxc = LXC(lxc_id, lxc_hostname, os, resources, network, options, creation, deploy) - lxcs.append(lxc) diff --git a/src/main.py b/src/main.py index 923f9f7..79dab93 100644 --- a/src/main.py +++ b/src/main.py @@ -1,21 +1,45 @@ import logging -from pathlib import Path +from pathlib import PosixPath, PurePosixPath, Path from . import project_path -from .lxc.utils import load_lxc, get_all_lxcs +from .lxc.lxc_utils import load_lxc, get_all_lxcs +from .utils import git_utils +from .utils.proxmox import ProxmoxHost -def run(): +def run(args): + # Check if running local or SSH + if args.host is None: + pve = ProxmoxHost(path=Path(args.path)) + else: + pve = ProxmoxHost(host=args.host, user=args.username, port=args.port, path=Path(args.path)) + pve.connect() + + if args.repo is None: + logging.warning("No repo provided, skipping updates/cloning...") + else: + # Check if the repo is already cloned + if not pve.has_directory(pve.repo_path): + logging.info(f"Cloning repository {args.repo} to {args.path}...") + git_utils.clone_repo(url=args.repo, path=args.path, linux_machine=pve) + elif pve.has_directory(args.path.joinpath(".git")): + logging.info(f"Repository already cloned at {args.path}, updating...") + git_utils.update_repo(path=args.path, linux_machine=pve) + + if not pve.has_directory(pve.repo_path): + raise FileNotFoundError(f"Repo not found at {pve.repo_path}") + # Read all files in the resources directory - resources = Path(project_path).joinpath("resources").glob("*") + resources = pve.list_dir(args.path) + # resources = Path(project_path).joinpath("resources").glob("*") # Go through each LXC file for resource in resources: if resource.name == "lxc": # Read all files in the LXC directory - lxc_folders = Path(project_path).joinpath("resources", "lxc").glob("*") + lxc_folders = PosixPath(project_path).joinpath("resources", "lxc").glob("*") for lxc_folder in lxc_folders: - lxc_file = Path(project_path).joinpath("resources", "lxc", lxc_folder, "config.json") + lxc_file = PosixPath(project_path).joinpath("resources", "lxc", lxc_folder, "config.json") load_lxc(filepath=lxc_file, lxc_id=int(lxc_folder.name)) diff --git a/src/utils/commands_utils.py b/src/utils/commands_utils.py new file mode 100644 index 0000000..0024773 --- /dev/null +++ b/src/utils/commands_utils.py @@ -0,0 +1,195 @@ +from pathlib import PurePosixPath + +from src.utils import utils +from src.utils.machine import LinuxMachine + + +def run_docker_command(linux_machine: LinuxMachine, container: str, command: str): + """Run a command inside a docker container on a linux host + + Parameters + ---------- + linux_machine : LinuxMachine + The LinuxMachine object of the linux host to run the command in + container : str + Name of the docker container to run the command in + command : str + Command to run in the docker container + + Examples + -------- + >>> run_docker_command(linux_machine, "", "") + """ + + linux_machine.run_command(f"docker exec -it {container} {command}", exception_on_empty_stdout=False) + + +def run_docker_compose_command(linux_machine: LinuxMachine, command: str, working_directory: str = None): + """Run a docker-compose command on a linux host + + Parameters + ---------- + linux_machine: LinuxMachine + The LinuxMachine object of the linux host to run the command in + command: str + The docker-compose command to run + working_directory: str, optional + The working directory to run the command in + + Examples + -------- + >>> run_docker_compose_command(linux_machine, "up -d", "/home/user/traefik") + """ + + docker_compose_exec = "docker-compose" + if not linux_machine.has_program(docker_compose_exec): + docker_compose_exec = "docker compose" + + if working_directory is not None: + linux_machine.run_command(f"cd {working_directory} && {docker_compose_exec} {command}", return_status_code=True) + else: + linux_machine.run_command(f"{docker_compose_exec} {command}", return_status_code=True) + + +def download_file(linux_machine: LinuxMachine, url: str, destination: str): + """Download a file from a URL to the Linux Machine and save it to the destination + + Parameters + ---------- + linux_machine: LinuxMachine + The LinuxMachine object of the linux host to download the file to + url: str + URL of the file to download + destination: str + Path to the destination to save the file to + Needs to end with a trailing slash + + Examples + -------- + >>> download_file(linux_machine, "https://example.com/file.zip", "/home/user/") + + """ + if type(url) is list: + for u in url: + linux_machine.run_command(f"wget {u} --directory-prefix={destination}", return_status_code=True) + else: + linux_machine.run_command(f"wget {url} --directory-prefix={destination}", return_status_code=True) + + +def unzip_file(linux_machine: LinuxMachine, path: str, destination: str = None): + """Unzip a file + + Parameters + ---------- + linux_machine: LinuxMachine + The LinuxMachine object of the linux host to unzip the file in + path: str + Path to the file to unzip + destination: str, optional + Path to the destination to unzip the file to + If not specified, it will unzip the file in the same directory as the file + Needs to end with a trailing slash + + Examples + -------- + >>> unzip_file(linux_machine, "/home/user/file.zip", "/home/user/extracted_files/") + >>> unzip_file(linux_machine, "/home/user/file.tar.gz", "/home/user/extracted_files/") + """ + + if destination is None: + destination = PurePosixPath(path).parent + + if ".zip" in path: + linux_machine.run_command(f"unzip {path} -d {destination}", return_status_code=True) + elif ".tar.gz" in path: + linux_machine.run_command(f"mkdir -p {destination} && tar -xzf {path} --directory {destination}", + return_status_code=True) + + +def install_package(linux_machine: LinuxMachine, package: str or list): + """Install a package in the Linux Machine + + Parameters + ---------- + linux_machine: LinuxMachine + The LinuxMachine object of the host to install the package in + package: str or list + Name(s) of the package(s) to install + + Examples + -------- + >>> install_package(linux_machine, "nginx") + >>> install_package(linux_machine, ["nginx", "apache2"]) + """ + + if type(package) is list: + for p in package: + linux_machine.run_command(f"{utils.get_install_package_command(linux_machine.get_os_name())} {package}", + return_status_code=True) + else: + linux_machine.run_command(f"{utils.get_install_package_command(linux_machine.get_os_name())} {package}", + return_status_code=True) + + +def remove_package(linux_machine: LinuxMachine, package: str or list): + """Remove a package in the Linux Machine + + Parameters + ---------- + linux_machine: LinuxMachine + The LinuxMachine object of the host to remove the package in + package: str or list + Name(s) of the package(s) to remove + + Examples + -------- + >>> remove_package(linux_machine, "nginx") + >>> remove_package(linux_machine, ["nginx", "apache2"]) + """ + + if type(package) is list: + packages = [] + for p in package: + packages.append(p) + + linux_machine.run_command( + f"{utils.get_remove_package_command(linux_machine.get_os_name())} {' '.join(packages)}", + return_status_code=True) + + else: + linux_machine.run_command(f"{utils.get_remove_package_command(linux_machine.get_os_name())} {package}", + return_status_code=True) + + +def replace_in_files(linux_machine: LinuxMachine, path: str or list, search: str, replace: str, + case_sensitive: bool = False): + """Replace a string in one or multiples files in a LinuxMachine + + Parameters + ---------- + linux_machine : LinuxMachine + The LinuxMachine object of the host to replace the string in + path : str or list of str + Path to the file(s) to replace the string in + search : str + String to search for + replace : str + String to replace the search string with + case_sensitive : bool, optional + Whether the search should be case sensitive or not + + Examples + -------- + >>> replace_in_files(linux_machine, "/home/user/file.txt", "username=root", "username=administrator" + >>> replace_in_files(linux_machine, ["/home/user/file1.txt", "/home/user/file2.txt"], \ + "username=root", "username=administrator", case_sensitive=True) + + """ + + if type(path) is list: + for p in path: + linux_machine.run_command(f"sed {'-i' if case_sensitive else ''} 's/{search}/{replace}/g' {p}", + return_status_code=True) + else: + linux_machine.run_command(f"sed {'-i' if case_sensitive else ''} 's/{search}/{replace}/g' {path}", + return_status_code=True) diff --git a/src/utils/git_utils.py b/src/utils/git_utils.py new file mode 100644 index 0000000..3e1ec81 --- /dev/null +++ b/src/utils/git_utils.py @@ -0,0 +1,15 @@ +import logging + +from src.utils.machine import LinuxMachine + + +def clone_repo(url, path, linux_machine: LinuxMachine): + """Clone the given repository to the given path""" + logging.info(f"Cloning repository {url} to {path}...") + linux_machine.run_command(f"git clone {url} {path}") + + +def update_repo(path, linux_machine: LinuxMachine): + """Update the given repository""" + logging.info(f"Updating repository at {path}...") + linux_machine.run_command(f"git -C {path} pull") diff --git a/src/utils/machine.py b/src/utils/machine.py new file mode 100644 index 0000000..0c9efdd --- /dev/null +++ b/src/utils/machine.py @@ -0,0 +1,118 @@ +from pathlib import Path + + +class LinuxMachine(): + + def __init__(self): + pass + + def get_hostname(self): + return self.run_command("hostname") + + def get_uptime(self): + return self.run_command("uptime -p") + + def get_os_name(self): + """Get OS name""" + return self.run_command("""cat /etc/os-release | grep -E '^NAME=' | cut -d '=' -f 2 | tr -d '"'""") + + def get_memory(self): + """Get memory""" + return self.run_command("free -m | grep Mem | awk '{print $2}'") + + def get_ipv4(self): + if self.has_program("ip"): + return self.run_command("""ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | cut -f1 -d'/'""") + elif self.has_program("ifconfig"): + return self.run_command(command="ifconfig eth0 | awk '/inet addr/{print substr($2,6)}'") + + def get_ipv6(self): + pass + + def get_mac(self): + """Get MAC address""" + return self.run_command("""cat /sys/class/net/$(ip route show default | awk '/default/ {print $5}')/address""") + + def start(self): + pass + + def stop(self): + pass + + def reboot(self): + pass + + def has_program(self, program: str): + """Check if program is installed on LXC + :param program: program executable name + + :return: boolean + """ + if type(program) == str: + return self.run_command("which " + program, return_status_code=True) == 0 + elif type(program) == list: + return all((self.run_command("which " + p, return_status_code=True) == 0) for p in program) + + def has_file(self, file: str or Path): + """Check if file exists on LXC + :param file: file or path + + :return: boolean + """ + if isinstance(file, Path): + file = str(file.as_posix()) + + return self.run_command("test -f " + file, return_status_code=True) == 0 + + def has_directory(self, directory: str or Path): + """Check if directory exists on LXC + :param directory: directory path + + :return: boolean + """ + + if isinstance(directory, Path): + directory = str(directory.as_posix()) + + return self.run_command("test -d " + directory, return_status_code=True) == 0 + + def run_command(self, command, **kwargs): + pass + + def run_script(self, script: str or Path): + pass + + def list_dir(self, directory: str or Path): + pass + + def create_file(self, file: str or Path, permission: int = 644): + """Create file""" + if isinstance(file, Path): + file = str(file.as_posix()) + + self.run_command(f"touch {file}", return_status_code=True) + if permission != 644: + self.run_command(f"chmod {permission} {file}", return_status_code=True) + + def create_directory(self, directory: str or Path, permission: int = 755): + """Create directory""" + if isinstance(directory, Path): + directory = str(directory.as_posix()) + + self.run_command(f"mkdir -p {directory}", return_status_code=True) + if permission != 755: + self.run_command(f"chmod -R {permission} {directory}", return_status_code=True) + + def delete_file(self, file: str or Path): + """Delete file""" + if isinstance(file, Path): + file = str(file.as_posix()) + + self.run_command(f"rm {file}", return_status_code=True) + + def delete_directory(self, directory: str or Path): + """Delete directory""" + if isinstance(directory, Path): + directory = str(directory.as_posix()) + + self.run_command(f"rm -rf {directory}", return_status_code=True) diff --git a/src/utils/proxmox.py b/src/utils/proxmox.py new file mode 100644 index 0000000..5f03da5 --- /dev/null +++ b/src/utils/proxmox.py @@ -0,0 +1,248 @@ +from __future__ import annotations + +import fnmatch +import subprocess +from pathlib import Path, PosixPath +from typing import TYPE_CHECKING + +from fabric import Connection + +from src.utils.machine import LinuxMachine + +if TYPE_CHECKING: + from ..lxc.lxc import LXC + + +class ProxmoxHost(LinuxMachine): + """Class to represent a Proxmox host.""" + + def __init__(self, host: str = None, user: str = "root", port: int = 22, path: Path = None): + """Initialize a Proxmox host. + + If no host is provided, it will use the local machine as the Proxmox host. + + Parameters + ---------- + host: str, optional + hostname or IP address of the Proxmox host + user: str, optional + username to use for SSH connection (default: root) + port: int, optional + port to use for SSH connection (default: 22) + """ + + super().__init__() + + self.host = host + self.user = user + self.port = port + self.connection = None + self.repo_path = path + + def __str__(self): + return f"ProxmoxHost({self.host}, {self.user}, {self.port})" + + def __repr__(self): + return f"ProxmoxHost({self.host}, {self.user}, {self.port})" + + def connect(self): + """Connect to the Proxmox host.""" + if self.host is not None: + self.connection = Connection(host=self.host, user=self.user, port=self.port) + + def get_connection(self): + """Get the connection to the Proxmox host.""" + return self.connection + + def run_command(self, command: str, return_status_code: bool = False, exception_on_exit: bool = True, + exception_on_empty_stdout: bool = False): + """Run a command on the Proxmox VE host + The default behavior is as follows : + - runs the command on the Proxmox VE host + - throws an exception if the exit status is not 0 + - returns the stdout of the command + -- if the stdout is None or empty, throws an exception + + If you don't except an output or depend on the status code returned by the command, you can set return_status_code to True: + - runs the command on the Proxmox VE host + - returns the status code of the command + + Parameters + ---------- + command: str + command to run + return_status_code: bool, optional + should it return the exit code and not the stdout, disables exception_on_exit + exception_on_exit: bool, optional + should an exception be thrown if the exit status code is not 0 + even though it's True by default, if you use return_status_code, it is disabled + exception_on_empty_stdout: bool, optional + should an exception be thrown if the stdout is None or empty + shell: bool, optional + should the command be run in a shell or not, see python subprocess documentation for more informations + + Returns + ------- + str + command output by default + int + command exit code if return_status_code is True + + Raises + ------ + Exception + if the exit status is not 0 and exception_on_exit is True + Exception + if the stdout is None and return_status_code is False + Exception + if the stdout is empty and exception_on_empty_stdout is True + """ + + # Check if host is None, if it is, run the command locally, else run it on the host via SSH + if self.host is None: + command = subprocess.run(command, shell=True, capture_output=True, encoding="utf-8") + elif self.connection is not None: + command = self.connection.run(command, hide=True, warn=True, encoding="utf-8") + else: + raise Exception("No host or connection provided") + + # If return code is not 0 and that exception_on_exit is True and return_status_code is False, throw an exception + if command.return_code != 0 and exception_on_exit and not return_status_code: + raise Exception(f"Error while running command: \n{command.stderr}") + + if return_status_code: + return command.return_code + + # Check if stdout is empty, throw an exception or return empty string depending on exception_on_empty_stdout + if (command.stdout is None or command.stdout == "") and exception_on_empty_stdout: + raise Exception( + f"Error, no output from command, try using the command with return_status_code instead: \n{command.stderr}") + elif command.stdout is None or command.stdout == "": + return "" + + # Decode stdout if it's bytes + if type(command.stdout) == bytes: + return command.stdout.decode().rstrip() + + return command.stdout.rstrip() + + def get_version(self): + """Get the version of the Proxmox host.""" + return self.run_command("pveversion") + + def get_all_lxcs(self): + """Get all the LXCs on the Proxmox host.""" + pct_list_output = self.run_command("pct list") + pct_list_output = pct_list_output.split("\n")[1:] + ids = [line.split()[0] for line in pct_list_output] + + if not ids: + return [] + + return ids + + def does_lxc_exist(self, lxc_id): + """Check if the given LXC exists on the Proxmox host.""" + return lxc_id in self.get_all_lxcs() + + def is_lxc_running(self, lxc_id): + """Check if the given LXC is running on the Proxmox host.""" + if not self.does_lxc_exist(lxc_id): + return False + + pct_status_output = self.run_command(f"pct status {lxc_id}") + return "running" in pct_status_output + + def start_lxc(self, lxc_id): + """Start the given LXC on the Proxmox host.""" + if not self.does_lxc_exist(lxc_id): + raise Exception(f"LXC {lxc_id} does not exist") + + if not self.is_lxc_running(lxc_id): + self.run_command(f"pct start {lxc_id}") + + def stop_lxc(self, lxc_id): + """Stop the given LXC on the Proxmox host.""" + if not self.does_lxc_exist(lxc_id): + raise Exception(f"LXC {lxc_id} does not exist") + + if self.is_lxc_running(lxc_id): + self.run_command(f"pct stop {lxc_id}") + + def reboot_lxc(self, lxc_id): + """Reboot the given LXC on the Proxmox host.""" + if not self.does_lxc_exist(lxc_id): + raise Exception(f"LXC {lxc_id} does not exist") + + self.run_command(f"pct reboot {lxc_id}") + + def get_all_vms(self): + """Get all the VMs on the Proxmox host.""" + qm_list_output = self.run_command("qm list") + qm_list_output = qm_list_output.split("\n")[1:] + ids = [line.split()[0] for line in qm_list_output] + + if not ids: + return [] + + return ids + + def does_vm_exist(self, vm_id): + """Check if the given VM exists on the Proxmox host.""" + return vm_id in self.get_all_vms() + + def is_vm_running(self, vm_id): + """Check if the given VM is running on the Proxmox host.""" + if not self.does_vm_exist(vm_id): + return False + + qm_status_output = self.run_command(f"qm status {vm_id}") + return "running" in qm_status_output + + def start_vm(self, vm_id): + """Start the given VM on the Proxmox host.""" + if not self.does_vm_exist(vm_id): + raise Exception(f"VM {vm_id} does not exist") + + if not self.is_vm_running(vm_id): + self.run_command(f"qm start {vm_id}") + + def stop_vm(self, vm_id): + """Stop the given VM on the Proxmox host.""" + if not self.does_vm_exist(vm_id): + raise Exception(f"VM {vm_id} does not exist") + + if self.is_vm_running(vm_id): + self.run_command(f"qm stop {vm_id}") + + def reboot_vm(self, vm_id): + """Reboot the given VM on the Proxmox host.""" + if not self.does_vm_exist(vm_id): + raise Exception(f"VM {vm_id} does not exist") + + self.run_command(f"qm reboot {vm_id}") + + def copy_file_to_lxc(self, lxc: LXC, source: str or Path, destination: str or Path): + """Copy the given file to the given LXC.""" + if isinstance(source, Path): + source = str(source.as_posix()) + if isinstance(destination, Path): + destination = str(destination.as_posix()) + + self.run_command(f"pct push {lxc.get_id()} {source} {destination}") + + def copy_folder_to_lxc(self, lxc: LXC, source: str or Path, destination: str or Path): + """Copy the given folder to the given LXC.""" + if isinstance(source, Path): + source = str(source.as_posix()) + if isinstance(destination, Path): + destination = str(destination.as_posix()) + + self.run_command(f"pct push {lxc.get_id()} {source} {destination} -r") + + def list_dir(self, directory: str or Path, glob_filter: str = '*'): + """List the given directory.""" + if isinstance(directory, Path): + directory = str(directory.as_posix()) + + return fnmatch.filter(self.connection.sftp().listdir(path=directory), glob_filter) diff --git a/src/utils/proxmox_utils.py b/src/utils/proxmox_utils.py deleted file mode 100644 index f1399be..0000000 --- a/src/utils/proxmox_utils.py +++ /dev/null @@ -1,456 +0,0 @@ -from __future__ import annotations - -import json -import logging -import os -import pathlib -import subprocess -from pathlib import Path -from typing import TYPE_CHECKING - -from .resources_utils import get_path -from .. import project_path -from ..lxc import commands_utils - -if TYPE_CHECKING: - from . import LXC - - -def get_pve_version(): - """Get PVE version - - Returns - ------- - str - PVE version - """ - return run_command_on_pve(command="pveversion") - - -def get_pve_hostname(): - """Get PVE hostname - - Returns - ------- - str - PVE hostname - - """ - return run_command_on_pve(command="hostname") - - -def does_lxc_exist(lxc_id: int): - """Check if an LXC exists with the given ID - - Parameters - ---------- - lxc_id: int - - Returns - ------- - bool - Does LXC exist? - """ - - # TODO: only check in VMID column - return str(lxc_id) in run_command_on_pve(command=f"pct list | awk '/{lxc_id}/'", exception_on_empty_stdout=False) - - -def does_qemu_vm_exist(vm_id: int): - """Check if a QEMU VM exists with the given ID - - Parameters - ---------- - vm_id: int - - Returns - ------- - bool - Does QEMU VM exist? - """ - - # TODO: only check in VMID column - return str(vm_id) in run_command_on_pve(command=f"qm list {vm_id}", exception_on_empty_stdout=False) - - -# def execute_tteck_script(script_url: str, env_variables: list): -# """Execute TTECK script with already filled environment variables to run silently (non-interactive) -# -# Parameters -# ---------- -# script_url: str -# script url (github or other) -# env_variables: list -# list of environment variables -# -# Returns -# ------- -# int -# status code -# """ -# -# env_variables = " ".join(env_variables) -# -# run_command_on_pve(command=f"{env_variables} && bash -c \"$(wget -qLO - {script_url}\"") - - -def get_config(): - """Get the JSON content of config.json file as a dict - - Returns - ------- - dict - JSON content of config.json file - """ - - config_file = os.path.join(project_path / "resources" / "config.json") - with open(config_file, "r") as file: - return json.loads(file.read()) - - -def run_command_on_pve(command: str, return_status_code: bool = False, exception_on_exit: bool = True, - exception_on_empty_stdout: bool = False, - force_local: bool = False, shell: bool = False): - """Run a command on the Proxmox VE host - The default behavior is as follows : - - runs the command on the Proxmox VE host - - throws an exception if the exit status is not 0 - - returns the stdout of the command - -- if the stdout is None or empty, throws an exception - - If you don't except an output or depend on the status code returned by the command, you can set return_status_code to True: - - runs the command on the Proxmox VE host - - returns the status code of the command - - Parameters - ---------- - command: str - command to run - return_status_code: bool, optional - should it return the exit code and not the stdout, disables exception_on_exit - exception_on_exit: bool, optional - should an exception be thrown if the exit status code is not 0 - even though it's True by default, if you use return_status_code, it is disabled - exception_on_empty_stdout: bool, optional - should an exception be thrown if the stdout is None or empty - force_local: bool, optional - should it be run locally not via ssh even with local mode off in the pve section of config.json - shell: bool, optional - should the command be run in a shell or not, see python subprocess documentation for more informations - - Returns - ------- - str - command output by default - int - command exit code if return_status_code is True - - Raises - ------ - Exception - if the exit status is not 0 and exception_on_exit is True - Exception - if the stdout is None and return_status_code is False - Exception - if the stdout is empty and exception_on_empty_stdout is True - """ - - # Get config - config = get_config() - - # Check if PVE is local or remote - if config['pve']['local'] or force_local: - # Run command and return output (not as bytes) - logging.debug(f"Running command on PVE/Machine running this script (locally): {command}") - - command = subprocess.run(command, shell=shell, capture_output=True, encoding="utf-8") - - else: - host = config['pve']['host'] - username = config['pve']['user'] - port = config['pve']['port'] - - # Log command for debugging - if "pct exec" in command: - logging.debug(f"Running command on LXC {command.split(' ')[2]} through PVE (ssh): {command}") - else: - logging.debug(f"Running command on PVE (ssh): {command}") - - # catch errors code - command = subprocess.run(f'ssh {username}@{host} -p {port} "{command}"', shell=shell, capture_output=True, - encoding="utf-8") - - # If return code is not 0 and that exception_on_exit is True and return_status_code is False, throw an exception - if command.returncode != 0 and exception_on_exit and not return_status_code: - raise Exception(f"Error while running command: \n{command.stderr}") - - if return_status_code: - return command.returncode - - # Check if stdout is empty, throw an exception or return empty string depending on exception_on_empty_stdout - if (command.stdout is None or command.stdout == "") and exception_on_empty_stdout: - raise Exception( - f"Error, no output from command, try using the command with return_status_code instead: \n{command.stderr}") - elif command.stdout is None or command.stdout == "": - return "" - - # Decode stdout if it's bytes - if type(command.stdout) == bytes: - return command.stdout.decode().rstrip() - - return command.stdout.rstrip() - - -def run_command_locally(command: str, return_status_code: bool = False, exception_on_exit: bool = False): - """Force running command locally even if local mode is off in config.json - - See Also - -------- - run_command_on_pve - """ - - run_command_on_pve(command=command, exception_on_exit=exception_on_exit, return_status_code=return_status_code, - force_local=True) - - -def run_command_ssh(command: str, host: str, username: str, port: int = 22, return_status_code: bool = False, - exception_on_exit: bool = False, exception_on_empty_stdout: bool = False, shell: bool = False): - """Run command on a remote host via SSH - - Parameters - ---------- - command: str - command to run - host: str - host to run the command on - username: str - username to use to connect to the host (usually root for lxc and vm) - port: int, optional - port to use to connect to the host (default: 22) - exception_on_exit: bool, optional - should an exception be thrown if the exit status is not 0 - exception_on_empty_stdout: bool, optional - should an exception be thrown if the stdout is empty or none - return_status_code: bool, optional - should it only return the exit code and not the stdout - shell: bool, optional - should the command be run in a shell (default: True) - - Returns - ------- - str - command output by default - int - command exit code if return_status_code is True - - """ - - logging.debug(f"Running command on host {host} (ssh): {command}") - - # catch errors code - command = subprocess.run(f'ssh {username}@{host} -p {port} "{command}"', shell=True, - capture_output=True, encoding="utf-8") - - if command.returncode != 0 and exception_on_exit: - raise Exception(f"Error while running command on PVE: \n{command.stderr}") - - if return_status_code: - return command.returncode - - return command.stdout.rstrip() - - -def get_install_package_command(distribution: str): - """Retrieve the correct command to install a package based on the distribution specified - It supports all the distribution supported by Proxmox VE (from the pct command documentation). - Debian, Ubuntu, CentOS, Fedora, Gentoo, Alpine, ArchLinux, Devuan, NixOS, OpenSUSE - - Parameters - ---------- - distribution: str - Name of the distribution as specific in the proxmox pct command documentation - - See Also - -------- - https://pve.proxmox.com/pve-docs/pct.1.html - - Returns - ------- - str - Beginning of the command to install a package, the package name should be appended to it - """ - - distribution = distribution.lower() - - if distribution == "debian": - return "apt-get install -y" - elif distribution == "ubuntu": - return "apt-get install -y" - elif distribution == "centos": - return "yum install -y" - elif distribution == "fedora": - return "yum install -y" - elif distribution == "gentoo": - return "emerge -a" - elif distribution == "alpine": - return "apk add" - elif distribution == "archlinux": - return "pacman -S --noconfirm" - elif distribution == "devuan": - return "apt-get install -y" - elif distribution == "nixos": - return "nix-env -i" - elif distribution == "opensuse": - return "zypper install -y" - else: - raise Exception(f"Unsupported distribution: {distribution}") - - -def get_remove_package_command(distribution: str): - """Retrieve the correct command to uninstall a package based on the distribution specified - It supports all the distribution supported by Proxmox VE (from the pct command documentation). - Debian, Ubuntu, CentOS, Fedora, Gentoo, Alpine, ArchLinux, Devuan, NixOS, OpenSUSE - - Parameters - ---------- - distribution: str - Name of the distribution as specific in the proxmox pct command documentation - - See Also - -------- - https://pve.proxmox.com/pve-docs/pct.1.html - - Returns - ------- - str - Beginning of the command to uninstall a package, the package name should be appended to it - """ - - distribution = distribution.lower() - - if distribution == "debian": - return "apt-get remove -y" - elif distribution == "ubuntu": - return "apt-get remove -y" - elif distribution == "centos": - return "yum remove -y" - elif distribution == "fedora": - return "yum remove -y" - elif distribution == "gentoo": - return "emerge -C" - elif distribution == "alpine": - return "apk del" - elif distribution == "archlinux": - return "pacman -R --noconfirm" - elif distribution == "devuan": - return "apt-get remove -y" - elif distribution == "nixos": - return "nix-env -e" - elif distribution == "opensuse": - return "zypper remove -y" - else: - raise Exception(f"Unsupported distribution: {distribution}") - - -def get_pve_ssh_public_key(): - """ Retrieve the SSH public key of the machine running this script - - Returns - ------- - str - SSH public key - - Raises - ------ - Exception - If an error occurs while retrieving the SSH public key (file not found, permission denied, etc.) - """ - - # TODO: maybe implement custom path for the key - - return run_command_on_pve(command="cat ~/.ssh/id_rsa.pub", exception_on_exit=True) - - -def get_identity_file(): - """ Retrieve the SSH public key of the machine running this script - - Returns - ------- - str - Path to the SSH private key - - Raises - ------ - Exception - If an error occurs while retrieving the SSH public key (file not found, permission denied, etc.) - """ - - if get_config()['settings']['identity_file']: - return get_path(lxc=None, path=get_config()['settings']['identity_file']) - elif Path("~/.ssh/id_rsa").expanduser().is_file(): - return Path("~/.ssh/id_rsa").expanduser() - else: - raise Exception("No identity file found") - - -def copy_file_to_pve(path: Path, destination: str): - """Copy a local file (on the machine running this script) to the PVE - - Parameters - ---------- - path: pathlib.Path - Path object to file on local machine - destination: str - Destination on PVE - - Examples - -------- - >>> copy_file_to_pve(Path("/home/user/file.txt"), "/root/file.txt") - """ - - # create the destination directory if it does not exist - run_command_on_pve(command=f"mkdir -p {destination.rsplit('/', 1)[0]}", return_status_code=True) - - config = get_config() - - # copy the file to the PVE from the local machine - run_command_locally(command=f"scp {str(path)} {config['pve']['user']}@{config['pve']['host']}:{destination}", - return_status_code=True) - - -def copy_file_to_lxc(lxc: LXC, path: Path, destination: str or Path): - """Copy a local file (on the machine running this program) to the PVE and finally to the LXC - It works by copying the file from the local machine to the PVE using SCP and - then using "pct push" to send it to the LXC. - Sometimes SSH is not installed on freshs LXC, so by using pct push we are sure that it will work. - - Parameters - ---------- - lxc: LXC - LXC object to copy the file to - path: pathlib.Path - Path object to file on local machine - destination: str or pathlib.Path - Destination on LXC - - Examples - -------- - >>> copy_file_to_lxc(100, Path("/home/user/file.txt"), "/root/") - """ - - # define path for temp file on pve - file = path.name - temp_path = f"/tmp/pdj-temp/{file}" - - # convert destination to string if it is a Path object - if isinstance(destination, Path): - destination = str(destination.as_posix()) - - # copy file to pve from local machine - copy_file_to_pve(path, temp_path) - - # make sure to create destination folder on lxc - commands_utils.create_folder_in_lxc(lxc, destination) - - # copy/push file from pve to lxc - run_command_on_pve(f"pct push {lxc.get_id()} {temp_path} {destination}{file}", return_status_code=True) diff --git a/src/utils/resources_utils.py b/src/utils/resources_utils.py index 875bfe4..c38c2c2 100644 --- a/src/utils/resources_utils.py +++ b/src/utils/resources_utils.py @@ -6,7 +6,7 @@ from typing import TYPE_CHECKING from .. import project_path if TYPE_CHECKING: - from ..lxc import LXC + from ..lxc.lxc import LXC def get_path(lxc: LXC, path: str): diff --git a/src/utils/utils.py b/src/utils/utils.py new file mode 100644 index 0000000..e41c96e --- /dev/null +++ b/src/utils/utils.py @@ -0,0 +1,90 @@ +def get_install_package_command(distribution: str): + """Retrieve the correct command to install a package based on the distribution specified + It supports all the distribution supported by Proxmox VE (from the pct command documentation). + Debian, Ubuntu, CentOS, Fedora, Gentoo, Alpine, ArchLinux, Devuan, NixOS, OpenSUSE + + Parameters + ---------- + distribution: str + Name of the distribution as specific in the proxmox pct command documentation + + See Also + -------- + https://pve.proxmox.com/pve-docs/pct.1.html + + Returns + ------- + str + Beginning of the command to install a package, the package name should be appended to it + """ + + distribution = distribution.lower() + + if "debian" in distribution: + return "apt-get install -y" + elif "ubuntu" in distribution: + return "apt-get install -y" + elif "centos" in distribution: + return "yum install -y" + elif "fedora" in distribution: + return "yum install -y" + elif "gentoo" in distribution: + return "emerge -a" + elif "alpine" in distribution: + return "apk add" + elif "archlinux" in distribution: + return "pacman -S --noconfirm" + elif "devuan" in distribution: + return "apt-get install -y" + elif "nixos" in distribution: + return "nix-env -i" + elif "opensuse" in distribution: + return "zypper install -y" + else: + raise Exception(f"Unsupported distribution: {distribution}") + + +def get_remove_package_command(distribution: str): + """Retrieve the correct command to uninstall a package based on the distribution specified + It supports all the distribution supported by Proxmox VE (from the pct command documentation). + Debian, Ubuntu, CentOS, Fedora, Gentoo, Alpine, ArchLinux, Devuan, NixOS, OpenSUSE + + Parameters + ---------- + distribution: str + Name of the distribution as specific in the proxmox pct command documentation + + See Also + -------- + https://pve.proxmox.com/pve-docs/pct.1.html + + Returns + ------- + str + Beginning of the command to uninstall a package, the package name should be appended to it + """ + + distribution = distribution.lower() + + if "debian" in distribution: + return "apt-get remove -y" + elif "ubuntu" in distribution: + return "apt-get remove -y" + elif "centos" in distribution: + return "yum remove -y" + elif "fedora" in distribution: + return "yum remove -y" + elif "gentoo" in distribution: + return "emerge -C" + elif "alpine" in distribution: + return "apk del" + elif "archlinux" in distribution: + return "pacman -R --noconfirm" + elif "devuan" in distribution: + return "apt-get remove -y" + elif "nixos" in distribution: + return "nix-env -e" + elif "opensuse" in distribution: + return "zypper remove -y" + else: + raise Exception(f"Unsupported distribution: {distribution}")