MASSIVE REWORK of structure include new object oriented approch with linuxmachine/pve host, etc
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
Mathieu Broillet 2023-06-15 16:25:47 +02:00
parent 19487527f9
commit e654102176
No known key found for this signature in database
GPG Key ID: 7D4F25BC50A0AA32
15 changed files with 859 additions and 1095 deletions

View File

@ -0,0 +1,2 @@
fabric~=3.1.0
patchwork~=1.0.1

12
run.py
View File

@ -4,15 +4,23 @@ import logging
from src import main
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser(description="Deploy JSON file as LXCs and VMs to your proxmox server")
parser.add_argument("-v", "--verbose", help="increase output verbosity", action="store_true")
parser.add_argument("--host", help="host to use for ssh", type=str)
parser.add_argument("--username", help="username to use for ssh (default: root)", type=str, default="root")
parser.add_argument("--port", help="port to use for ssh (default: 22)", type=str, default=22)
parser.add_argument("--repo", help="git repo to clone", type=str)
parser.add_argument("path", help="path where your repo is cloned or will be cloned", type=str)
args = parser.parse_args()
# Set logging level
if args.verbose:
logging.basicConfig(format='[%(levelname)s] : %(message)s', level=logging.DEBUG)
else:
logging.basicConfig(format='[%(levelname)s] : %(message)s', level=logging.INFO)
main.run()
# Run the main program
main.run(args)

View File

@ -1,411 +0,0 @@
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from ..utils import proxmox_utils
from ..utils.resources_utils import get_path
if TYPE_CHECKING:
from . import LXC
def run_script(lxc: LXC, step: dict):
"""Method use to dispatch the script to the correct method depending on the type of script, being
local (on the machine running this program), remote (url) or in the LXC.
Parameters
----------
lxc : LXC
The LXC object used to run the script
step: dict
Dictionary containing the step information and configuration about the script to run
Typically read from the "creation/steps/<step>" in JSON file
"""
# Install bash if not installed
# Sometimes only ash or sh are installed, which doesn't work for some scripts
if not lxc.has_program("bash"):
install_package(lxc, "bash")
# Run local script
if "local_path" in step:
path = get_path(lxc, step["local_path"])
_run_local_script_on_lxc(lxc, path)
# Run remote script
elif "url" in step:
_run_remote_script_on_lxc(lxc, step["url"])
# Run script in LXC
elif "lxc_path" in step:
path = get_path(lxc, step["lxc_path"])
_run_script_on_lxc(lxc, path)
def _run_script_on_lxc(lxc: LXC, path: Path):
"""Runs a script present inside the LXC storage, without copying or downloading it
Parameters
----------
lxc: LXC
The LXC object used to run the script
path: pathlib.Path
Path to the script inside the LXC storage
"""
lxc.run_command(f"bash {str(path.as_posix())}", shell=True)
def _run_local_script_on_lxc(lxc: LXC, path: Path):
"""Runs a script present on the local machine running this program, inside the LXC
It works by copying the script from the local machine to the PVE using SCP and
then using "pct push" to send it to the LXC.
Parameters
----------
lxc: LXC
The LXC object used to run the script
path: pathlib.Path
Path object to the script on the local machine (machine running this program)
"""
# define the path of the script inside the LXC
script_path_in_lxc = Path("/tmp/pdj-scripts/").joinpath(path.name)
# Copy the script to the LXC
proxmox_utils.copy_file_to_lxc(lxc, path, f"{str(script_path_in_lxc.parent.as_posix())}/")
# and run it from there
_run_script_on_lxc(lxc, script_path_in_lxc)
# once it has been run, remove the script from the LXC
remove_file_in_lxc(lxc, script_path_in_lxc)
# with open(path, "r") as file:
# script = file.read()
#
# # TODO : Might work with EOF instead of copying the script to /tmp but unable to make it work
# # lxc.run_command("bash <<EOF\n" + script + "\nEOF")
def _run_remote_script_on_lxc(lxc: LXC, url: str):
"""Runs a script from a URL on a LXC
Usually this should not be called directly but rather through the run_script() method with the
according step configuration (local_path, url or lxc_path) which is itself usually read from the JSON file.
Parameters
----------
lxc: LXC
The LXC object used to run the script
url: str
URL of the script to run
"""
if lxc.has_program("curl"):
lxc.run_command(f"curl -sSL {url} | bash", shell=True)
def create_folder_in_lxc(lxc: LXC, path: str or Path, permission=755):
"""Create a folder in the LXC
It's possible to specify the permission of the folder, by default it uses 755
Parameters
----------
lxc: LXC
The LXC object of the LXC to create the folder in
path: str or pathlib.Path
Path to the folder to create in the LXC
permission: int, optional
Permission of the folder to create in the LXC (default is 755)
"""
if isinstance(path, Path):
path = str(path.as_posix())
lxc.run_command(f"mkdir -p {path}", return_status_code=True)
if permission != 755:
lxc.run_command(f"chmod -R {permission} {path}", return_status_code=True)
def remove_folder_in_lxc(lxc: LXC, path: str or Path):
"""Remove a folder in the LXC
Parameters
----------
lxc: LXC
The LXC object of the LXC to remove the folder in
path: str or pathlib.Path
Path to the folder to remove in the LXC
"""
if isinstance(path, Path):
path = str(path.as_posix())
lxc.run_command(f"rm -rf {path}", return_status_code=True)
def create_file_in_lxc(lxc: LXC, path: str or Path, permission=644):
"""Create a file in the LXC
It's possible to specify the permission of the file, by default it uses 644
Parameters
----------
lxc: LXC
The LXC object of the LXC to create the file in
path: str or pathlib.Path
Path to the file to create in the LXC
permission: int, optional
Permission of the file to create in the LXC (default is 644)
"""
if isinstance(path, Path):
path = str(path.as_posix())
lxc.run_command(f"touch {path}", return_status_code=True)
if permission != 644:
lxc.run_command(f"chmod {permission} {path}", return_status_code=True)
def remove_file_in_lxc(lxc: LXC, path: str or Path):
"""Remove a file in the LXC
Parameters
----------
lxc: LXC
The LXC object of the LXC to remove the file in
path: str or pathlib.Path
Path to the file to remove in the LXC
"""
if isinstance(path, Path):
path = str(path.as_posix())
lxc.run_command(f"rm {path}", return_status_code=True)
def copy_local_file_to_lxc(lxc: LXC, path: Path, destination: str):
"""Copy a local file to the LXC
Parameters
----------
lxc : LXC
The LXC object of the LXC to copy the file to
path: pathlib.Path
Path object to the file on the machine running this program
destination: str
Path to the destination in the LXC
"""
command = proxmox_utils.run_command_locally(command=f"scp -B {str(path)} {lxc.get_ssh_string()}:{destination}",
return_status_code=True)
if command.returncode != 0:
raise Exception(f"Unable to copy file {str(path)} to LXC {lxc.get_id()} at {destination}, probably SSH key issue")
def copy_local_folder_to_lxc(lxc: LXC, path: Path, destination: str):
"""Copy a local folder to the LXC
Parameters
----------
lxc : LXC
The LXC object of the LXC to copy the folder to
path: pathlib.Path
Path object to the folder on the machine running this program
destination: str
Path to the destination in the LXC
"""
command = proxmox_utils.run_command_locally(command=f"scp -B -r {str(path)} {lxc.get_ssh_string()}:{destination}",
return_status_code=True)
if command.returncode != 0:
raise Exception(f"Unable to copy file {str(path)} to LXC {lxc.get_id()} at {destination}, probably SSH key issue")
def run_docker_command(lxc: LXC, container: str, command: str):
"""Run a command inside a docker container on the LXC
Parameters
----------
lxc : LXC
The LXC object of the LXC to access the docker container on
container : str
Name of the docker container to run the command in
command : str
Command to run in the docker container
Examples
--------
>>> run_docker_command(lxc, "<container-name>", "<command>")
"""
lxc.run_command(f"docker exec -it {container} {command}", exception_on_empty_stdout=False)
def run_docker_compose_command(lxc: LXC, command: str, working_directory: str = None):
"""Run a docker-compose command in the LXC
Parameters
----------
lxc: LXC
The LXC object of the LXC to run the docker-compose command in
command: str
The docker-compose command to run
working_directory: str, optional
The working directory to run the command in
Examples
--------
>>> run_docker_compose_command(lxc, "up -d", "/home/user/traefik")
"""
docker_compose_exec = "docker-compose"
if not lxc.has_program(docker_compose_exec):
docker_compose_exec = "docker compose"
if working_directory is not None:
lxc.run_command(f"cd {working_directory} && {docker_compose_exec} {command}", return_status_code=True)
else:
lxc.run_command(f"{docker_compose_exec} {command}", return_status_code=True)
def download_file(lxc: LXC, url: str, destination: str):
"""Download a file from a URL to the LXC and save it to the destination
Parameters
----------
lxc: LXC
The LXC object of the LXC to download the file to
url: str
URL of the file to download
destination: str
Path to the destination to save the file to
Needs to end with a trailing slash
Examples
--------
>>> download_file(lxc, "https://example.com/file.zip", "/home/user/")
"""
"""Download a file from a URL to the LXC"""
if type(url) is list:
for u in url:
lxc.run_command(f"wget {u} --directory-prefix={destination}", return_status_code=True)
else:
lxc.run_command(f"wget {url} --directory-prefix={destination}", return_status_code=True)
def unzip_file(lxc: LXC, path: str, destination: str = None):
"""Unzip a file in the LXC
Parameters
----------
lxc: LXC
The LXC object of the LXC to unzip the file in
path: str
Path to the file to unzip
destination: str, optional
Path to the destination to unzip the file to
If not specified, it will unzip the file in the same directory as the file
Needs to end with a trailing slash
Examples
--------
>>> unzip_file(lxc, "/home/user/file.zip", "/home/user/extracted_files/")
>>> unzip_file(lxc, "/home/user/file.tar.gz", "/home/user/extracted_files/")
"""
if destination is None:
destination = Path(path).parent
if ".zip" in path:
lxc.run_command(f"unzip {path} -d {destination}", return_status_code=True)
elif ".tar.gz" in path:
lxc.run_command(f"mkdir -p {destination} && tar -xzf {path} --directory {destination}", return_status_code=True)
def install_package(lxc: LXC, package: str or list):
"""Install a package in the LXC
Parameters
----------
lxc: LXC
The LXC object of the LXC to install the package in
package: str or list
Name(s) of the package(s) to install
Examples
--------
>>> install_package(lxc, "nginx")
>>> install_package(lxc, ["nginx", "apache2"])
"""
if type(package) is list:
for p in package:
lxc.run_command(f"{proxmox_utils.get_install_package_command(lxc.get_os_name())} {p}",
return_status_code=True)
else:
lxc.run_command(f"{proxmox_utils.get_install_package_command(lxc.get_os_name())} {package}",
return_status_code=True)
def remove_package(lxc: LXC, package: str or list):
"""Remove a package in the LXC
Parameters
----------
lxc: LXC
The LXC object of the LXC to remove the package in
package: str or list
Name(s) of the package(s) to remove
Examples
--------
>>> remove_package(lxc, "nginx")
>>> remove_package(lxc, ["nginx", "apache2"])
"""
if type(package) is list:
packages = []
for p in package:
packages.append(p)
lxc.run_command(f"{proxmox_utils.get_remove_package_command(lxc.get_os_name())} {' '.join(packages)}",
return_status_code=True)
else:
lxc.run_command(f"{proxmox_utils.get_remove_package_command(lxc.get_os_name())} {package}",
return_status_code=True)
def replace_in_files(lxc: LXC, path: str or list, search: str, replace: str, case_sensitive: bool = False):
"""Replace a string in one or multiples files in the LXC
Parameters
----------
lxc : LXC
The LXC object of the LXC to replace the string in
path : str or list of str
Path to the file(s) to replace the string in
search : str
String to search for
replace : str
String to replace the search string with
case_sensitive : bool, optional
Whether the search should be case sensitive or not
Examples
--------
>>> replace_in_files(lxc, "/home/user/file.txt", "username=root", "username=administrator"
>>> replace_in_files(lxc, ["/home/user/file1.txt", "/home/user/file2.txt"], \
"username=root", "username=administrator", case_sensitive=True)
"""
if type(path) is list:
for p in path:
lxc.run_command(f"sed {'-i' if case_sensitive else ''} 's/{search}/{replace}/g' {p}",
return_status_code=True)
else:
lxc.run_command(f"sed {'-i' if case_sensitive else ''} 's/{search}/{replace}/g' {path}",
return_status_code=True)

View File

@ -4,11 +4,11 @@ import logging
from pathlib import Path
from typing import TYPE_CHECKING
from . import commands_utils
from ..utils import commands_utils
from ..utils.resources_utils import get_path
if TYPE_CHECKING:
from . import LXC
from .lxc import LXC
def are_all_conditions_met(lxc: LXC):

View File

@ -1,15 +1,18 @@
import logging
from pathlib import Path
from . import creation_utils, commands_utils
from ..utils import proxmox_utils
from . import creation_utils, lxc_utils
from ..utils import commands_utils
from ..utils.machine import LinuxMachine
from ..utils.proxmox import ProxmoxHost
class LXC:
class LXC(LinuxMachine):
"""LXC object"""
def __init__(self, lxc_id: int, lxc_hostname: str, os: dict, resources: dict, network: dict, options: dict,
creation: dict, deploy: dict):
creation: dict, deploy: dict, proxmox_host: ProxmoxHost):
super().__init__()
self.lxc_id = lxc_id
self.lxc_hostname = lxc_hostname
@ -36,11 +39,14 @@ class LXC:
self.options = options
self.privileged = options["privileged"]
self.start_on_boot = options["start_on_boot"]
self.startup_order = options["startup_order"]
self.password = options["password"]
self.creation = creation
self.deploy = deploy
self.pve = proxmox_host
def __str__(self):
return f"LXC {self.lxc_id} ({self.lxc_hostname})"
@ -99,14 +105,14 @@ class LXC:
# TODO: might have to run "pveam update" before running this command on fresh install of PVE
template_name = proxmox_utils.run_command_on_pve(
template_name = self.pve.run_command(
f"pveam available --section system | awk /\'{self.os_name}-{self.os_release}/\' | awk \'{{print $2}}\'")
is_template_downloaded = proxmox_utils.run_command_on_pve(command=f"pveam list local | awk /'{template_name}/'")
is_template_downloaded = self.pve.run_command(command=f"pveam list local | awk /'{template_name}/'")
if is_template_downloaded == "":
logging.info(f"Template {template_name} not found, downloading it...")
proxmox_utils.run_command_on_pve(command=f"pveam download local {template_name}")
self.pve.run_command(command=f"pveam download local {template_name}")
return f"local:vztmpl/{template_name}"
@ -173,11 +179,7 @@ class LXC:
"""
if self.ipv4 == "dhcp":
if self.is_running():
if self.has_program("ip"):
return self.run_command(
command="ip addr | grep \'state UP\' -A2 | tail -n1 | awk \'{{print $2}}\' | cut -f1 -d\'/\'")
elif self.has_program("ifconfig"):
return self.run_command(command="ifconfig eth0 | awk '/inet addr/{print substr($2,6)}'")
return super().get_ipv4()
return self.ipv4
@ -237,6 +239,13 @@ class LXC:
"""
return self.start_on_boot
def get_startup_order(self):
"""
Get startup order
:return: startup order
"""
return self.startup_order
def get_password(self):
"""
Get password
@ -267,39 +276,33 @@ class LXC:
Is running
:return: is lxc running? (boolean)
"""
return proxmox_utils.run_command_on_pve(command=f"pct list | awk '/running/ && /{self.lxc_id}/'") != ""
return self.pve.is_lxc_running(self.lxc_id)
def does_exist(self):
"""
Does exist
:return: does lxc exist? (boolean)
"""
return proxmox_utils.does_lxc_exist(self.lxc_id)
return self.pve.does_lxc_exist(self.lxc_id)
def start(self):
"""
Start LXC
"""
if self.is_running():
logging.info(f"LXC {self.lxc_id} already running, skipping start")
return
else:
logging.info(f"Starting LXC {self.lxc_id}")
proxmox_utils.run_command_on_pve(command=f"pct start {self.lxc_id}")
self.pve.start_lxc(self.lxc_id)
def stop(self):
"""
Stop LXC
"""
logging.info(f"Stopping LXC {self.lxc_id}")
proxmox_utils.run_command_on_pve(command=f"pct stop {self.lxc_id}")
self.pve.stop_lxc(self.lxc_id)
def reboot(self):
"""
Reboot LXC
"""
logging.info(f"Rebooting LXC {self.lxc_id}")
proxmox_utils.run_command_on_pve(command=f"pct reboot {self.lxc_id}")
self.pve.reboot_lxc(self.lxc_id)
def create(self):
"""
@ -307,14 +310,15 @@ class LXC:
"""
if self.does_exist():
logging.info(f"LXC {self.lxc_id} already exists, skipping creation")
proxmox_utils.run_command_on_pve(command=self.get_pct_command(create=False))
self.pve.run_command(command=lxc_utils.generate_pct_command_for_lxc(self, create=False))
else:
logging.info(f"Creating LXC {self.lxc_id}")
proxmox_utils.run_command_on_pve(command=self.get_pct_command(create=True))
self.pve.run_command(command=lxc_utils.generate_pct_command_for_lxc(self, create=True))
self.start()
logging.info("Setting up SSH for LXC")
# self.run_script(script_path="protected/scripts/install-config-ssh.sh")
commands_utils.run_script(lxc=self, step={"local_path": "protected/scripts/install-config-ssh.sh"})
def run_creation(self):
@ -338,43 +342,6 @@ class LXC:
def deploy(self):
pass
def has_program(self, program):
"""
Check if program is installed on LXC
:param program: program executable name
:return: boolean
"""
if type(program) == str:
return self.run_command("which " + program, return_status_code=True) == 0
elif type(program) == list:
return all((self.run_command("which " + p, return_status_code=True) == 0) for p in program)
def has_file(self, file: str or Path):
"""
Check if file exists on LXC
:param file: file or path
:return: boolean
"""
if isinstance(file, Path):
file = str(file.as_posix())
return self.run_command("test -f " + file, return_status_code=True) == 0
def has_directory(self, directory: str or Path):
"""
Check if directory exists on LXC
:param directory: directory path
:return: boolean
"""
if isinstance(directory, Path):
directory = str(directory.as_posix())
return self.run_command("test -d " + directory, return_status_code=True) == 0
def run_script(self, script_path):
"""
Run script on LXC filesystem using bash
@ -384,7 +351,7 @@ class LXC:
"""
return commands_utils.run_script(self, {"lxc_path": script_path})
def run_command(self, command: str, ssh: bool = False, return_status_code: bool = False,
def run_command(self, command: str, return_status_code: bool = False,
exception_on_exit: bool = False,
exception_on_empty_stdout: bool = False,
working_directory: str = None, shell: bool = False):
@ -399,84 +366,6 @@ class LXC:
if working_directory:
command = f"cd {working_directory} && {command}"
if ssh:
return proxmox_utils.run_command_ssh(command=command, host=self.get_ipv4(), username="root", port=22,
return_status_code=return_status_code,
self.pve.run_command(command=f"pct exec {self.lxc_id} -- {command}", return_status_code=return_status_code,
exception_on_exit=exception_on_exit,
exception_on_empty_stdout=exception_on_empty_stdout,
shell=shell)
else:
return proxmox_utils.run_command_on_pve(command=f"pct exec {self.lxc_id} -- {command}",
return_status_code=return_status_code,
exception_on_exit=exception_on_exit,
exception_on_empty_stdout=exception_on_empty_stdout,
shell=shell)
def get_pct_command(self, create=True):
"""
Get pct command to create/edit LXC
:return: pct command
"""
if create:
# Create command
pct_command = f"pct create {self.lxc_id} {self.get_os_template()} " \
f"--hostname {self.lxc_hostname} " \
f"--cores {self.cpu} " \
f"--memory {self.memory} " \
f"--swap {self.memory} " \
f"--net0 name=eth0,bridge={self.bridge},ip={self.ipv4},hwaddr={self.mac},type=veth " \
f"--onboot {int(self.start_on_boot == 0)} " \
f"--ostype {self.os_name} " \
f"--password {self.password} " \
f"--storage {self.storage} " \
f"--unprivileged {not self.privileged} " \
f"--rootfs volume={self.storage}:{self.disk},size={self.disk} " \
f"--ssh-public-keys /root/.ssh/id_rsa.pub " \
f"--unprivileged {not self.privileged}"
else:
# Update command
pct_command = f"pct set {self.lxc_id} " \
f"--hostname {self.lxc_hostname} " \
f"--cores {self.cpu} " \
f"--memory {self.memory} " \
f"--swap {self.memory} " \
f"--net0 name=eth0,bridge={self.bridge},ip={self.ipv4},hwaddr={self.mac},type=veth " \
f"--onboot {int(self.start_on_boot == 0)} " \
f"--ostype {self.os_name} "
# TODO: add gateway4
# f"ip6={self.ipv6},gw6={self.gateway6},trunks={self.vlan} " \ # TODO
return pct_command
def get_tteck_env_variables(self):
"""
Get TTECK environment variables to run scripts silently
! Deprecated for now !
:return: environment variables
"""
env_variables = {
"CT_TYPE": "1",
"PW": self.password,
"CT_ID": self.lxc_id,
"HN": self.lxc_hostname,
"DISK_SIZE": self.disk,
"CORE_COUNT": self.cpu,
"RAM_SIZE": self.memory,
"BRG": self.bridge,
"NET": self.ipv4,
"GATE": self.gateway4,
"DISABLEIP6": "no",
"MTU": "",
"SD": "",
"NS": "",
"MAC": self.mac,
"VLAN": self.vlan,
# "SSH": self.ssh,
"VERB": "no"
}
env_command = " && ".join([f"export {name}=\"{value}\"" for name, value in env_variables.items()])
return env_command
exception_on_empty_stdout=exception_on_empty_stdout)

114
src/lxc/lxc_utils.py Normal file
View File

@ -0,0 +1,114 @@
import json
from pathlib import Path
from .lxc import LXC
lxcs = []
def get_all_lxcs():
return lxcs
def load_lxc(lxc_id: int, filepath: Path):
"""Load LXC from JSON file
Parameters
----------
lxc_id : int
ID of the LXC to load
filepath : pathlib.Path
Path object to the JSON file of the LXC to load
Examples
--------
>>> load_lxc(100, Path("<full-path>/resources/lxc/100/config.json"))
"""
# Load JSON data
data = json.loads(filepath.read_text())
# Extract values from JSON
lxc_id = lxc_id
lxc_hostname = data["lxc_hostname"]
os = data["os"]
resources = data["resources"]
network = data["network"]
options = data["options"]
creation = data["creation"]
deploy = data["deploy"]
# Create LXC object
lxc = LXC(lxc_id, lxc_hostname, os, resources, network, options, creation, deploy)
lxcs.append(lxc)
def get_tteck_env_variables(self):
"""
Get TTECK environment variables to run scripts silently
! Deprecated for now !
:return: environment variables
"""
env_variables = {
"CT_TYPE": "1",
"PW": self.password,
"CT_ID": self.lxc_id,
"HN": self.lxc_hostname,
"DISK_SIZE": self.disk,
"CORE_COUNT": self.cpu,
"RAM_SIZE": self.memory,
"BRG": self.bridge,
"NET": self.ipv4,
"GATE": self.gateway4,
"DISABLEIP6": "no",
"MTU": "",
"SD": "",
"NS": "",
"MAC": self.mac,
"VLAN": self.vlan,
# "SSH": self.ssh,
"VERB": "no"
}
env_command = " && ".join([f"export {name}=\"{value}\"" for name, value in env_variables.items()])
return env_command
def generate_pct_command_for_lxc(lxc: LXC, create: bool = True):
"""
Get pct command to create/edit LXC
:return: pct command
"""
if create:
# Create command
pct_command = f"pct create {lxc.get_id()} {lxc.get_os_template()} " \
f"--hostname {lxc.get_hostname()} " \
f"--cores {lxc.get_cpu()} " \
f"--memory {lxc.get_memory()} " \
f"--swap {lxc.get_swap()} " \
f"--net0 name=eth0,bridge={lxc.get_bridge()},ip={lxc.get_ipv4()},hwaddr={lxc.get_mac()},type=veth " \
f"--onboot {int(lxc.is_start_on_boot())} " \
f"--ostype {lxc.get_os_name()} " \
f"--password {lxc.get_password()} " \
f"--storage {lxc.get_storage()} " \
f"--unprivileged {not lxc.is_privileged()} " \
f"--rootfs volume={lxc.get_storage()}:{lxc.get_disk()},size={lxc.get_disk()} " \
f"--ssh-public-keys /root/.ssh/id_rsa.pub " \
f"--unprivileged {not lxc.is_privileged()}"
else:
# Update command
pct_command = f"pct set {lxc.get_id()} " \
f"--hostname {lxc.get_hostname()} " \
f"--cores {lxc.get_cpu()} " \
f"--memory {lxc.get_memory()} " \
f"--swap {lxc.get_memory()} " \
f"--net0 name=eth0,bridge={lxc.get_bridge()},ip={lxc.get_ipv4()},hwaddr={lxc.get_mac()},type=veth " \
f"--onboot {int(lxc.is_start_on_boot())} " \
f"--ostype {lxc.get_os_name()} "
# TODO: add gateway4
# f"ip6={self.ipv6},gw6={self.gateway6},trunks={self.vlan} " \ # TODO
return pct_command

View File

@ -1,72 +0,0 @@
import json
from pathlib import Path
from . import LXC
lxcs = []
def get_all_lxcs():
"""Get all LXC objects
Returns
-------
list
List of all loaded LXC objects
"""
return lxcs
def get_lxc(lxc_id: int):
"""Get LXC by ID
Parameters
----------
lxc_id : int
ID of the LXC to get
Returns
-------
LXC
LXC object
"""
for lxc in lxcs:
if lxc.get_id() == lxc_id:
return lxc
return None
def load_lxc(lxc_id: int, filepath: Path):
"""Load LXC from JSON file
Parameters
----------
lxc_id : int
ID of the LXC to load
filepath : pathlib.Path
Path object to the JSON file of the LXC to load
Examples
--------
>>> load_lxc(100, Path("<full-path>/resources/lxc/100/config.json"))
"""
# Load JSON data
data = json.loads(filepath.read_text())
# Extract values from JSON
lxc_id = lxc_id
lxc_hostname = data["lxc_hostname"]
os = data["os"]
resources = data["resources"]
network = data["network"]
options = data["options"]
creation = data["creation"]
deploy = data["deploy"]
# Create LXC object
lxc = LXC(lxc_id, lxc_hostname, os, resources, network, options, creation, deploy)
lxcs.append(lxc)

View File

@ -1,21 +1,45 @@
import logging
from pathlib import Path
from pathlib import PosixPath, PurePosixPath, Path
from . import project_path
from .lxc.utils import load_lxc, get_all_lxcs
from .lxc.lxc_utils import load_lxc, get_all_lxcs
from .utils import git_utils
from .utils.proxmox import ProxmoxHost
def run():
def run(args):
# Check if running local or SSH
if args.host is None:
pve = ProxmoxHost(path=Path(args.path))
else:
pve = ProxmoxHost(host=args.host, user=args.username, port=args.port, path=Path(args.path))
pve.connect()
if args.repo is None:
logging.warning("No repo provided, skipping updates/cloning...")
else:
# Check if the repo is already cloned
if not pve.has_directory(pve.repo_path):
logging.info(f"Cloning repository {args.repo} to {args.path}...")
git_utils.clone_repo(url=args.repo, path=args.path, linux_machine=pve)
elif pve.has_directory(args.path.joinpath(".git")):
logging.info(f"Repository already cloned at {args.path}, updating...")
git_utils.update_repo(path=args.path, linux_machine=pve)
if not pve.has_directory(pve.repo_path):
raise FileNotFoundError(f"Repo not found at {pve.repo_path}")
# Read all files in the resources directory
resources = Path(project_path).joinpath("resources").glob("*")
resources = pve.list_dir(args.path)
# resources = Path(project_path).joinpath("resources").glob("*")
# Go through each LXC file
for resource in resources:
if resource.name == "lxc":
# Read all files in the LXC directory
lxc_folders = Path(project_path).joinpath("resources", "lxc").glob("*")
lxc_folders = PosixPath(project_path).joinpath("resources", "lxc").glob("*")
for lxc_folder in lxc_folders:
lxc_file = Path(project_path).joinpath("resources", "lxc", lxc_folder, "config.json")
lxc_file = PosixPath(project_path).joinpath("resources", "lxc", lxc_folder, "config.json")
load_lxc(filepath=lxc_file, lxc_id=int(lxc_folder.name))

195
src/utils/commands_utils.py Normal file
View File

@ -0,0 +1,195 @@
from pathlib import PurePosixPath
from src.utils import utils
from src.utils.machine import LinuxMachine
def run_docker_command(linux_machine: LinuxMachine, container: str, command: str):
"""Run a command inside a docker container on a linux host
Parameters
----------
linux_machine : LinuxMachine
The LinuxMachine object of the linux host to run the command in
container : str
Name of the docker container to run the command in
command : str
Command to run in the docker container
Examples
--------
>>> run_docker_command(linux_machine, "<container-name>", "<command>")
"""
linux_machine.run_command(f"docker exec -it {container} {command}", exception_on_empty_stdout=False)
def run_docker_compose_command(linux_machine: LinuxMachine, command: str, working_directory: str = None):
"""Run a docker-compose command on a linux host
Parameters
----------
linux_machine: LinuxMachine
The LinuxMachine object of the linux host to run the command in
command: str
The docker-compose command to run
working_directory: str, optional
The working directory to run the command in
Examples
--------
>>> run_docker_compose_command(linux_machine, "up -d", "/home/user/traefik")
"""
docker_compose_exec = "docker-compose"
if not linux_machine.has_program(docker_compose_exec):
docker_compose_exec = "docker compose"
if working_directory is not None:
linux_machine.run_command(f"cd {working_directory} && {docker_compose_exec} {command}", return_status_code=True)
else:
linux_machine.run_command(f"{docker_compose_exec} {command}", return_status_code=True)
def download_file(linux_machine: LinuxMachine, url: str, destination: str):
"""Download a file from a URL to the Linux Machine and save it to the destination
Parameters
----------
linux_machine: LinuxMachine
The LinuxMachine object of the linux host to download the file to
url: str
URL of the file to download
destination: str
Path to the destination to save the file to
Needs to end with a trailing slash
Examples
--------
>>> download_file(linux_machine, "https://example.com/file.zip", "/home/user/")
"""
if type(url) is list:
for u in url:
linux_machine.run_command(f"wget {u} --directory-prefix={destination}", return_status_code=True)
else:
linux_machine.run_command(f"wget {url} --directory-prefix={destination}", return_status_code=True)
def unzip_file(linux_machine: LinuxMachine, path: str, destination: str = None):
"""Unzip a file
Parameters
----------
linux_machine: LinuxMachine
The LinuxMachine object of the linux host to unzip the file in
path: str
Path to the file to unzip
destination: str, optional
Path to the destination to unzip the file to
If not specified, it will unzip the file in the same directory as the file
Needs to end with a trailing slash
Examples
--------
>>> unzip_file(linux_machine, "/home/user/file.zip", "/home/user/extracted_files/")
>>> unzip_file(linux_machine, "/home/user/file.tar.gz", "/home/user/extracted_files/")
"""
if destination is None:
destination = PurePosixPath(path).parent
if ".zip" in path:
linux_machine.run_command(f"unzip {path} -d {destination}", return_status_code=True)
elif ".tar.gz" in path:
linux_machine.run_command(f"mkdir -p {destination} && tar -xzf {path} --directory {destination}",
return_status_code=True)
def install_package(linux_machine: LinuxMachine, package: str or list):
"""Install a package in the Linux Machine
Parameters
----------
linux_machine: LinuxMachine
The LinuxMachine object of the host to install the package in
package: str or list
Name(s) of the package(s) to install
Examples
--------
>>> install_package(linux_machine, "nginx")
>>> install_package(linux_machine, ["nginx", "apache2"])
"""
if type(package) is list:
for p in package:
linux_machine.run_command(f"{utils.get_install_package_command(linux_machine.get_os_name())} {package}",
return_status_code=True)
else:
linux_machine.run_command(f"{utils.get_install_package_command(linux_machine.get_os_name())} {package}",
return_status_code=True)
def remove_package(linux_machine: LinuxMachine, package: str or list):
"""Remove a package in the Linux Machine
Parameters
----------
linux_machine: LinuxMachine
The LinuxMachine object of the host to remove the package in
package: str or list
Name(s) of the package(s) to remove
Examples
--------
>>> remove_package(linux_machine, "nginx")
>>> remove_package(linux_machine, ["nginx", "apache2"])
"""
if type(package) is list:
packages = []
for p in package:
packages.append(p)
linux_machine.run_command(
f"{utils.get_remove_package_command(linux_machine.get_os_name())} {' '.join(packages)}",
return_status_code=True)
else:
linux_machine.run_command(f"{utils.get_remove_package_command(linux_machine.get_os_name())} {package}",
return_status_code=True)
def replace_in_files(linux_machine: LinuxMachine, path: str or list, search: str, replace: str,
case_sensitive: bool = False):
"""Replace a string in one or multiples files in a LinuxMachine
Parameters
----------
linux_machine : LinuxMachine
The LinuxMachine object of the host to replace the string in
path : str or list of str
Path to the file(s) to replace the string in
search : str
String to search for
replace : str
String to replace the search string with
case_sensitive : bool, optional
Whether the search should be case sensitive or not
Examples
--------
>>> replace_in_files(linux_machine, "/home/user/file.txt", "username=root", "username=administrator"
>>> replace_in_files(linux_machine, ["/home/user/file1.txt", "/home/user/file2.txt"], \
"username=root", "username=administrator", case_sensitive=True)
"""
if type(path) is list:
for p in path:
linux_machine.run_command(f"sed {'-i' if case_sensitive else ''} 's/{search}/{replace}/g' {p}",
return_status_code=True)
else:
linux_machine.run_command(f"sed {'-i' if case_sensitive else ''} 's/{search}/{replace}/g' {path}",
return_status_code=True)

15
src/utils/git_utils.py Normal file
View File

@ -0,0 +1,15 @@
import logging
from src.utils.machine import LinuxMachine
def clone_repo(url, path, linux_machine: LinuxMachine):
"""Clone the given repository to the given path"""
logging.info(f"Cloning repository {url} to {path}...")
linux_machine.run_command(f"git clone {url} {path}")
def update_repo(path, linux_machine: LinuxMachine):
"""Update the given repository"""
logging.info(f"Updating repository at {path}...")
linux_machine.run_command(f"git -C {path} pull")

118
src/utils/machine.py Normal file
View File

@ -0,0 +1,118 @@
from pathlib import Path
class LinuxMachine():
def __init__(self):
pass
def get_hostname(self):
return self.run_command("hostname")
def get_uptime(self):
return self.run_command("uptime -p")
def get_os_name(self):
"""Get OS name"""
return self.run_command("""cat /etc/os-release | grep -E '^NAME=' | cut -d '=' -f 2 | tr -d '"'""")
def get_memory(self):
"""Get memory"""
return self.run_command("free -m | grep Mem | awk '{print $2}'")
def get_ipv4(self):
if self.has_program("ip"):
return self.run_command("""ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | cut -f1 -d'/'""")
elif self.has_program("ifconfig"):
return self.run_command(command="ifconfig eth0 | awk '/inet addr/{print substr($2,6)}'")
def get_ipv6(self):
pass
def get_mac(self):
"""Get MAC address"""
return self.run_command("""cat /sys/class/net/$(ip route show default | awk '/default/ {print $5}')/address""")
def start(self):
pass
def stop(self):
pass
def reboot(self):
pass
def has_program(self, program: str):
"""Check if program is installed on LXC
:param program: program executable name
:return: boolean
"""
if type(program) == str:
return self.run_command("which " + program, return_status_code=True) == 0
elif type(program) == list:
return all((self.run_command("which " + p, return_status_code=True) == 0) for p in program)
def has_file(self, file: str or Path):
"""Check if file exists on LXC
:param file: file or path
:return: boolean
"""
if isinstance(file, Path):
file = str(file.as_posix())
return self.run_command("test -f " + file, return_status_code=True) == 0
def has_directory(self, directory: str or Path):
"""Check if directory exists on LXC
:param directory: directory path
:return: boolean
"""
if isinstance(directory, Path):
directory = str(directory.as_posix())
return self.run_command("test -d " + directory, return_status_code=True) == 0
def run_command(self, command, **kwargs):
pass
def run_script(self, script: str or Path):
pass
def list_dir(self, directory: str or Path):
pass
def create_file(self, file: str or Path, permission: int = 644):
"""Create file"""
if isinstance(file, Path):
file = str(file.as_posix())
self.run_command(f"touch {file}", return_status_code=True)
if permission != 644:
self.run_command(f"chmod {permission} {file}", return_status_code=True)
def create_directory(self, directory: str or Path, permission: int = 755):
"""Create directory"""
if isinstance(directory, Path):
directory = str(directory.as_posix())
self.run_command(f"mkdir -p {directory}", return_status_code=True)
if permission != 755:
self.run_command(f"chmod -R {permission} {directory}", return_status_code=True)
def delete_file(self, file: str or Path):
"""Delete file"""
if isinstance(file, Path):
file = str(file.as_posix())
self.run_command(f"rm {file}", return_status_code=True)
def delete_directory(self, directory: str or Path):
"""Delete directory"""
if isinstance(directory, Path):
directory = str(directory.as_posix())
self.run_command(f"rm -rf {directory}", return_status_code=True)

248
src/utils/proxmox.py Normal file
View File

@ -0,0 +1,248 @@
from __future__ import annotations
import fnmatch
import subprocess
from pathlib import Path, PosixPath
from typing import TYPE_CHECKING
from fabric import Connection
from src.utils.machine import LinuxMachine
if TYPE_CHECKING:
from ..lxc.lxc import LXC
class ProxmoxHost(LinuxMachine):
"""Class to represent a Proxmox host."""
def __init__(self, host: str = None, user: str = "root", port: int = 22, path: Path = None):
"""Initialize a Proxmox host.
If no host is provided, it will use the local machine as the Proxmox host.
Parameters
----------
host: str, optional
hostname or IP address of the Proxmox host
user: str, optional
username to use for SSH connection (default: root)
port: int, optional
port to use for SSH connection (default: 22)
"""
super().__init__()
self.host = host
self.user = user
self.port = port
self.connection = None
self.repo_path = path
def __str__(self):
return f"ProxmoxHost({self.host}, {self.user}, {self.port})"
def __repr__(self):
return f"ProxmoxHost({self.host}, {self.user}, {self.port})"
def connect(self):
"""Connect to the Proxmox host."""
if self.host is not None:
self.connection = Connection(host=self.host, user=self.user, port=self.port)
def get_connection(self):
"""Get the connection to the Proxmox host."""
return self.connection
def run_command(self, command: str, return_status_code: bool = False, exception_on_exit: bool = True,
exception_on_empty_stdout: bool = False):
"""Run a command on the Proxmox VE host
The default behavior is as follows :
- runs the command on the Proxmox VE host
- throws an exception if the exit status is not 0
- returns the stdout of the command
-- if the stdout is None or empty, throws an exception
If you don't except an output or depend on the status code returned by the command, you can set return_status_code to True:
- runs the command on the Proxmox VE host
- returns the status code of the command
Parameters
----------
command: str
command to run
return_status_code: bool, optional
should it return the exit code and not the stdout, disables exception_on_exit
exception_on_exit: bool, optional
should an exception be thrown if the exit status code is not 0
even though it's True by default, if you use return_status_code, it is disabled
exception_on_empty_stdout: bool, optional
should an exception be thrown if the stdout is None or empty
shell: bool, optional
should the command be run in a shell or not, see python subprocess documentation for more informations
Returns
-------
str
command output by default
int
command exit code if return_status_code is True
Raises
------
Exception
if the exit status is not 0 and exception_on_exit is True
Exception
if the stdout is None and return_status_code is False
Exception
if the stdout is empty and exception_on_empty_stdout is True
"""
# Check if host is None, if it is, run the command locally, else run it on the host via SSH
if self.host is None:
command = subprocess.run(command, shell=True, capture_output=True, encoding="utf-8")
elif self.connection is not None:
command = self.connection.run(command, hide=True, warn=True, encoding="utf-8")
else:
raise Exception("No host or connection provided")
# If return code is not 0 and that exception_on_exit is True and return_status_code is False, throw an exception
if command.return_code != 0 and exception_on_exit and not return_status_code:
raise Exception(f"Error while running command: \n{command.stderr}")
if return_status_code:
return command.return_code
# Check if stdout is empty, throw an exception or return empty string depending on exception_on_empty_stdout
if (command.stdout is None or command.stdout == "") and exception_on_empty_stdout:
raise Exception(
f"Error, no output from command, try using the command with return_status_code instead: \n{command.stderr}")
elif command.stdout is None or command.stdout == "":
return ""
# Decode stdout if it's bytes
if type(command.stdout) == bytes:
return command.stdout.decode().rstrip()
return command.stdout.rstrip()
def get_version(self):
"""Get the version of the Proxmox host."""
return self.run_command("pveversion")
def get_all_lxcs(self):
"""Get all the LXCs on the Proxmox host."""
pct_list_output = self.run_command("pct list")
pct_list_output = pct_list_output.split("\n")[1:]
ids = [line.split()[0] for line in pct_list_output]
if not ids:
return []
return ids
def does_lxc_exist(self, lxc_id):
"""Check if the given LXC exists on the Proxmox host."""
return lxc_id in self.get_all_lxcs()
def is_lxc_running(self, lxc_id):
"""Check if the given LXC is running on the Proxmox host."""
if not self.does_lxc_exist(lxc_id):
return False
pct_status_output = self.run_command(f"pct status {lxc_id}")
return "running" in pct_status_output
def start_lxc(self, lxc_id):
"""Start the given LXC on the Proxmox host."""
if not self.does_lxc_exist(lxc_id):
raise Exception(f"LXC {lxc_id} does not exist")
if not self.is_lxc_running(lxc_id):
self.run_command(f"pct start {lxc_id}")
def stop_lxc(self, lxc_id):
"""Stop the given LXC on the Proxmox host."""
if not self.does_lxc_exist(lxc_id):
raise Exception(f"LXC {lxc_id} does not exist")
if self.is_lxc_running(lxc_id):
self.run_command(f"pct stop {lxc_id}")
def reboot_lxc(self, lxc_id):
"""Reboot the given LXC on the Proxmox host."""
if not self.does_lxc_exist(lxc_id):
raise Exception(f"LXC {lxc_id} does not exist")
self.run_command(f"pct reboot {lxc_id}")
def get_all_vms(self):
"""Get all the VMs on the Proxmox host."""
qm_list_output = self.run_command("qm list")
qm_list_output = qm_list_output.split("\n")[1:]
ids = [line.split()[0] for line in qm_list_output]
if not ids:
return []
return ids
def does_vm_exist(self, vm_id):
"""Check if the given VM exists on the Proxmox host."""
return vm_id in self.get_all_vms()
def is_vm_running(self, vm_id):
"""Check if the given VM is running on the Proxmox host."""
if not self.does_vm_exist(vm_id):
return False
qm_status_output = self.run_command(f"qm status {vm_id}")
return "running" in qm_status_output
def start_vm(self, vm_id):
"""Start the given VM on the Proxmox host."""
if not self.does_vm_exist(vm_id):
raise Exception(f"VM {vm_id} does not exist")
if not self.is_vm_running(vm_id):
self.run_command(f"qm start {vm_id}")
def stop_vm(self, vm_id):
"""Stop the given VM on the Proxmox host."""
if not self.does_vm_exist(vm_id):
raise Exception(f"VM {vm_id} does not exist")
if self.is_vm_running(vm_id):
self.run_command(f"qm stop {vm_id}")
def reboot_vm(self, vm_id):
"""Reboot the given VM on the Proxmox host."""
if not self.does_vm_exist(vm_id):
raise Exception(f"VM {vm_id} does not exist")
self.run_command(f"qm reboot {vm_id}")
def copy_file_to_lxc(self, lxc: LXC, source: str or Path, destination: str or Path):
"""Copy the given file to the given LXC."""
if isinstance(source, Path):
source = str(source.as_posix())
if isinstance(destination, Path):
destination = str(destination.as_posix())
self.run_command(f"pct push {lxc.get_id()} {source} {destination}")
def copy_folder_to_lxc(self, lxc: LXC, source: str or Path, destination: str or Path):
"""Copy the given folder to the given LXC."""
if isinstance(source, Path):
source = str(source.as_posix())
if isinstance(destination, Path):
destination = str(destination.as_posix())
self.run_command(f"pct push {lxc.get_id()} {source} {destination} -r")
def list_dir(self, directory: str or Path, glob_filter: str = '*'):
"""List the given directory."""
if isinstance(directory, Path):
directory = str(directory.as_posix())
return fnmatch.filter(self.connection.sftp().listdir(path=directory), glob_filter)

View File

@ -1,456 +0,0 @@
from __future__ import annotations
import json
import logging
import os
import pathlib
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING
from .resources_utils import get_path
from .. import project_path
from ..lxc import commands_utils
if TYPE_CHECKING:
from . import LXC
def get_pve_version():
"""Get PVE version
Returns
-------
str
PVE version
"""
return run_command_on_pve(command="pveversion")
def get_pve_hostname():
"""Get PVE hostname
Returns
-------
str
PVE hostname
"""
return run_command_on_pve(command="hostname")
def does_lxc_exist(lxc_id: int):
"""Check if an LXC exists with the given ID
Parameters
----------
lxc_id: int
Returns
-------
bool
Does LXC exist?
"""
# TODO: only check in VMID column
return str(lxc_id) in run_command_on_pve(command=f"pct list | awk '/{lxc_id}/'", exception_on_empty_stdout=False)
def does_qemu_vm_exist(vm_id: int):
"""Check if a QEMU VM exists with the given ID
Parameters
----------
vm_id: int
Returns
-------
bool
Does QEMU VM exist?
"""
# TODO: only check in VMID column
return str(vm_id) in run_command_on_pve(command=f"qm list {vm_id}", exception_on_empty_stdout=False)
# def execute_tteck_script(script_url: str, env_variables: list):
# """Execute TTECK script with already filled environment variables to run silently (non-interactive)
#
# Parameters
# ----------
# script_url: str
# script url (github or other)
# env_variables: list
# list of environment variables
#
# Returns
# -------
# int
# status code
# """
#
# env_variables = " ".join(env_variables)
#
# run_command_on_pve(command=f"{env_variables} && bash -c \"$(wget -qLO - {script_url}\"")
def get_config():
"""Get the JSON content of config.json file as a dict
Returns
-------
dict
JSON content of config.json file
"""
config_file = os.path.join(project_path / "resources" / "config.json")
with open(config_file, "r") as file:
return json.loads(file.read())
def run_command_on_pve(command: str, return_status_code: bool = False, exception_on_exit: bool = True,
exception_on_empty_stdout: bool = False,
force_local: bool = False, shell: bool = False):
"""Run a command on the Proxmox VE host
The default behavior is as follows :
- runs the command on the Proxmox VE host
- throws an exception if the exit status is not 0
- returns the stdout of the command
-- if the stdout is None or empty, throws an exception
If you don't except an output or depend on the status code returned by the command, you can set return_status_code to True:
- runs the command on the Proxmox VE host
- returns the status code of the command
Parameters
----------
command: str
command to run
return_status_code: bool, optional
should it return the exit code and not the stdout, disables exception_on_exit
exception_on_exit: bool, optional
should an exception be thrown if the exit status code is not 0
even though it's True by default, if you use return_status_code, it is disabled
exception_on_empty_stdout: bool, optional
should an exception be thrown if the stdout is None or empty
force_local: bool, optional
should it be run locally not via ssh even with local mode off in the pve section of config.json
shell: bool, optional
should the command be run in a shell or not, see python subprocess documentation for more informations
Returns
-------
str
command output by default
int
command exit code if return_status_code is True
Raises
------
Exception
if the exit status is not 0 and exception_on_exit is True
Exception
if the stdout is None and return_status_code is False
Exception
if the stdout is empty and exception_on_empty_stdout is True
"""
# Get config
config = get_config()
# Check if PVE is local or remote
if config['pve']['local'] or force_local:
# Run command and return output (not as bytes)
logging.debug(f"Running command on PVE/Machine running this script (locally): {command}")
command = subprocess.run(command, shell=shell, capture_output=True, encoding="utf-8")
else:
host = config['pve']['host']
username = config['pve']['user']
port = config['pve']['port']
# Log command for debugging
if "pct exec" in command:
logging.debug(f"Running command on LXC {command.split(' ')[2]} through PVE (ssh): {command}")
else:
logging.debug(f"Running command on PVE (ssh): {command}")
# catch errors code
command = subprocess.run(f'ssh {username}@{host} -p {port} "{command}"', shell=shell, capture_output=True,
encoding="utf-8")
# If return code is not 0 and that exception_on_exit is True and return_status_code is False, throw an exception
if command.returncode != 0 and exception_on_exit and not return_status_code:
raise Exception(f"Error while running command: \n{command.stderr}")
if return_status_code:
return command.returncode
# Check if stdout is empty, throw an exception or return empty string depending on exception_on_empty_stdout
if (command.stdout is None or command.stdout == "") and exception_on_empty_stdout:
raise Exception(
f"Error, no output from command, try using the command with return_status_code instead: \n{command.stderr}")
elif command.stdout is None or command.stdout == "":
return ""
# Decode stdout if it's bytes
if type(command.stdout) == bytes:
return command.stdout.decode().rstrip()
return command.stdout.rstrip()
def run_command_locally(command: str, return_status_code: bool = False, exception_on_exit: bool = False):
"""Force running command locally even if local mode is off in config.json
See Also
--------
run_command_on_pve
"""
run_command_on_pve(command=command, exception_on_exit=exception_on_exit, return_status_code=return_status_code,
force_local=True)
def run_command_ssh(command: str, host: str, username: str, port: int = 22, return_status_code: bool = False,
exception_on_exit: bool = False, exception_on_empty_stdout: bool = False, shell: bool = False):
"""Run command on a remote host via SSH
Parameters
----------
command: str
command to run
host: str
host to run the command on
username: str
username to use to connect to the host (usually root for lxc and vm)
port: int, optional
port to use to connect to the host (default: 22)
exception_on_exit: bool, optional
should an exception be thrown if the exit status is not 0
exception_on_empty_stdout: bool, optional
should an exception be thrown if the stdout is empty or none
return_status_code: bool, optional
should it only return the exit code and not the stdout
shell: bool, optional
should the command be run in a shell (default: True)
Returns
-------
str
command output by default
int
command exit code if return_status_code is True
"""
logging.debug(f"Running command on host {host} (ssh): {command}")
# catch errors code
command = subprocess.run(f'ssh {username}@{host} -p {port} "{command}"', shell=True,
capture_output=True, encoding="utf-8")
if command.returncode != 0 and exception_on_exit:
raise Exception(f"Error while running command on PVE: \n{command.stderr}")
if return_status_code:
return command.returncode
return command.stdout.rstrip()
def get_install_package_command(distribution: str):
"""Retrieve the correct command to install a package based on the distribution specified
It supports all the distribution supported by Proxmox VE (from the pct command documentation).
Debian, Ubuntu, CentOS, Fedora, Gentoo, Alpine, ArchLinux, Devuan, NixOS, OpenSUSE
Parameters
----------
distribution: str
Name of the distribution as specific in the proxmox pct command documentation
See Also
--------
https://pve.proxmox.com/pve-docs/pct.1.html
Returns
-------
str
Beginning of the command to install a package, the package name should be appended to it
"""
distribution = distribution.lower()
if distribution == "debian":
return "apt-get install -y"
elif distribution == "ubuntu":
return "apt-get install -y"
elif distribution == "centos":
return "yum install -y"
elif distribution == "fedora":
return "yum install -y"
elif distribution == "gentoo":
return "emerge -a"
elif distribution == "alpine":
return "apk add"
elif distribution == "archlinux":
return "pacman -S --noconfirm"
elif distribution == "devuan":
return "apt-get install -y"
elif distribution == "nixos":
return "nix-env -i"
elif distribution == "opensuse":
return "zypper install -y"
else:
raise Exception(f"Unsupported distribution: {distribution}")
def get_remove_package_command(distribution: str):
"""Retrieve the correct command to uninstall a package based on the distribution specified
It supports all the distribution supported by Proxmox VE (from the pct command documentation).
Debian, Ubuntu, CentOS, Fedora, Gentoo, Alpine, ArchLinux, Devuan, NixOS, OpenSUSE
Parameters
----------
distribution: str
Name of the distribution as specific in the proxmox pct command documentation
See Also
--------
https://pve.proxmox.com/pve-docs/pct.1.html
Returns
-------
str
Beginning of the command to uninstall a package, the package name should be appended to it
"""
distribution = distribution.lower()
if distribution == "debian":
return "apt-get remove -y"
elif distribution == "ubuntu":
return "apt-get remove -y"
elif distribution == "centos":
return "yum remove -y"
elif distribution == "fedora":
return "yum remove -y"
elif distribution == "gentoo":
return "emerge -C"
elif distribution == "alpine":
return "apk del"
elif distribution == "archlinux":
return "pacman -R --noconfirm"
elif distribution == "devuan":
return "apt-get remove -y"
elif distribution == "nixos":
return "nix-env -e"
elif distribution == "opensuse":
return "zypper remove -y"
else:
raise Exception(f"Unsupported distribution: {distribution}")
def get_pve_ssh_public_key():
""" Retrieve the SSH public key of the machine running this script
Returns
-------
str
SSH public key
Raises
------
Exception
If an error occurs while retrieving the SSH public key (file not found, permission denied, etc.)
"""
# TODO: maybe implement custom path for the key
return run_command_on_pve(command="cat ~/.ssh/id_rsa.pub", exception_on_exit=True)
def get_identity_file():
""" Retrieve the SSH public key of the machine running this script
Returns
-------
str
Path to the SSH private key
Raises
------
Exception
If an error occurs while retrieving the SSH public key (file not found, permission denied, etc.)
"""
if get_config()['settings']['identity_file']:
return get_path(lxc=None, path=get_config()['settings']['identity_file'])
elif Path("~/.ssh/id_rsa").expanduser().is_file():
return Path("~/.ssh/id_rsa").expanduser()
else:
raise Exception("No identity file found")
def copy_file_to_pve(path: Path, destination: str):
"""Copy a local file (on the machine running this script) to the PVE
Parameters
----------
path: pathlib.Path
Path object to file on local machine
destination: str
Destination on PVE
Examples
--------
>>> copy_file_to_pve(Path("/home/user/file.txt"), "/root/file.txt")
"""
# create the destination directory if it does not exist
run_command_on_pve(command=f"mkdir -p {destination.rsplit('/', 1)[0]}", return_status_code=True)
config = get_config()
# copy the file to the PVE from the local machine
run_command_locally(command=f"scp {str(path)} {config['pve']['user']}@{config['pve']['host']}:{destination}",
return_status_code=True)
def copy_file_to_lxc(lxc: LXC, path: Path, destination: str or Path):
"""Copy a local file (on the machine running this program) to the PVE and finally to the LXC
It works by copying the file from the local machine to the PVE using SCP and
then using "pct push" to send it to the LXC.
Sometimes SSH is not installed on freshs LXC, so by using pct push we are sure that it will work.
Parameters
----------
lxc: LXC
LXC object to copy the file to
path: pathlib.Path
Path object to file on local machine
destination: str or pathlib.Path
Destination on LXC
Examples
--------
>>> copy_file_to_lxc(100, Path("/home/user/file.txt"), "/root/")
"""
# define path for temp file on pve
file = path.name
temp_path = f"/tmp/pdj-temp/{file}"
# convert destination to string if it is a Path object
if isinstance(destination, Path):
destination = str(destination.as_posix())
# copy file to pve from local machine
copy_file_to_pve(path, temp_path)
# make sure to create destination folder on lxc
commands_utils.create_folder_in_lxc(lxc, destination)
# copy/push file from pve to lxc
run_command_on_pve(f"pct push {lxc.get_id()} {temp_path} {destination}{file}", return_status_code=True)

View File

@ -6,7 +6,7 @@ from typing import TYPE_CHECKING
from .. import project_path
if TYPE_CHECKING:
from ..lxc import LXC
from ..lxc.lxc import LXC
def get_path(lxc: LXC, path: str):

90
src/utils/utils.py Normal file
View File

@ -0,0 +1,90 @@
def get_install_package_command(distribution: str):
"""Retrieve the correct command to install a package based on the distribution specified
It supports all the distribution supported by Proxmox VE (from the pct command documentation).
Debian, Ubuntu, CentOS, Fedora, Gentoo, Alpine, ArchLinux, Devuan, NixOS, OpenSUSE
Parameters
----------
distribution: str
Name of the distribution as specific in the proxmox pct command documentation
See Also
--------
https://pve.proxmox.com/pve-docs/pct.1.html
Returns
-------
str
Beginning of the command to install a package, the package name should be appended to it
"""
distribution = distribution.lower()
if "debian" in distribution:
return "apt-get install -y"
elif "ubuntu" in distribution:
return "apt-get install -y"
elif "centos" in distribution:
return "yum install -y"
elif "fedora" in distribution:
return "yum install -y"
elif "gentoo" in distribution:
return "emerge -a"
elif "alpine" in distribution:
return "apk add"
elif "archlinux" in distribution:
return "pacman -S --noconfirm"
elif "devuan" in distribution:
return "apt-get install -y"
elif "nixos" in distribution:
return "nix-env -i"
elif "opensuse" in distribution:
return "zypper install -y"
else:
raise Exception(f"Unsupported distribution: {distribution}")
def get_remove_package_command(distribution: str):
"""Retrieve the correct command to uninstall a package based on the distribution specified
It supports all the distribution supported by Proxmox VE (from the pct command documentation).
Debian, Ubuntu, CentOS, Fedora, Gentoo, Alpine, ArchLinux, Devuan, NixOS, OpenSUSE
Parameters
----------
distribution: str
Name of the distribution as specific in the proxmox pct command documentation
See Also
--------
https://pve.proxmox.com/pve-docs/pct.1.html
Returns
-------
str
Beginning of the command to uninstall a package, the package name should be appended to it
"""
distribution = distribution.lower()
if "debian" in distribution:
return "apt-get remove -y"
elif "ubuntu" in distribution:
return "apt-get remove -y"
elif "centos" in distribution:
return "yum remove -y"
elif "fedora" in distribution:
return "yum remove -y"
elif "gentoo" in distribution:
return "emerge -C"
elif "alpine" in distribution:
return "apk del"
elif "archlinux" in distribution:
return "pacman -R --noconfirm"
elif "devuan" in distribution:
return "apt-get remove -y"
elif "nixos" in distribution:
return "nix-env -e"
elif "opensuse" in distribution:
return "zypper remove -y"
else:
raise Exception(f"Unsupported distribution: {distribution}")