added support for step conditions and improved logging and setup ssh identity file in settings
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Mathieu Broillet 2023-06-14 14:31:20 +02:00
parent fbf1e87b39
commit 4cb9752c23
No known key found for this signature in database
GPG Key ID: 7D4F25BC50A0AA32
2 changed files with 138 additions and 18 deletions

View File

@ -30,17 +30,33 @@ def are_all_conditions_met(lxc: LXC):
"""
# creation = lxc.get_creation()
# conditions = creation["conditions"]
#
# result = []
#
# for file in conditions['files']:
# result.append(Path(file).is_file())
# for folder in conditions['folders']:
# result.append(Path(folder).is_dir())
# for program in conditions['programs']:
# result.append(lxc.run_command("which " + program, return_status_code=True) == 0)
creation = lxc.get_creation()
conditions = creation["conditions"]
global_conditions = creation["conditions"]
steps = creation["steps"]
result = []
for file in conditions['files']:
result.append(Path(file).is_file())
for folder in conditions['folders']:
result.append(Path(folder).is_dir())
for program in conditions['programs']:
result.append(lxc.run_command("which " + program, return_status_code=True) == 0)
# Check the global conditions
for condition_type in global_conditions:
result = check_conditions(c_type=condition_type, parent=global_conditions, lxc=lxc, result=result)
# Check the specific conditions for each step
for step in steps:
if "conditions" in step:
for condition_type in step["conditions"]:
result = check_conditions(c_type=condition_type, parent=step["conditions"], lxc=lxc, result=result)
if all(result):
logging.info(f"All creations conditions met for LXC {lxc.lxc_id}, running deploy steps...")
@ -50,6 +66,68 @@ def are_all_conditions_met(lxc: LXC):
return all(result)
def check_conditions(c_type: str, parent: dict, lxc: LXC, result: list = None):
"""Check the conditions for the given LXC
Parameters
----------
c_type : str
The type of condition to check
parent : dict
The parent dictionary of the condition
lxc : LXC
The LXC object used to check the conditions
result : list, optional
The list of results for the conditions to append to, by default None (creates a new list)
Returns
-------
list
The list of results for the conditions
"""
if not result:
result = []
if c_type == "files":
for file in parent[c_type]:
result.append(lxc.has_file(Path(file)))
elif c_type == "folders":
for folder in parent[c_type]:
result.append(lxc.has_directory(Path(folder)))
elif c_type == "programs":
for program in parent[c_type]:
result.append(lxc.has_program(program))
return result
def check_conditions_for_step(step: dict, lxc: LXC):
"""Check the conditions for the given step
Parameters
----------
step : dict
The step to check the conditions for
lxc : LXC
The LXC object used to check the conditions
Returns
-------
bool
True if all conditions are met, False otherwise
"""
result = []
if "conditions" in step:
for condition in step['conditions']:
result.append(check_conditions(condition, parent=step['conditions'], lxc=lxc, result=result))
else:
return True
return all(result)
def optional(var: any, placeholder: any):
"""Return a placeholder if the given variable is None, otherwise return the variable
@ -84,7 +162,14 @@ def run_steps(lxc: LXC):
creation = lxc.get_creation()
creation_steps = creation["steps"]
for step in creation_steps:
for index, step in enumerate(creation_steps):
logging.info(f"Running step {index + 1}/{len(creation_steps)} for LXC {lxc.get_id()}...")
if check_conditions_for_step(step, lxc):
logging.info(f"Conditions already met for step {index + 1}/{len(creation_steps)} for LXC {lxc.get_id()}, "
f"skipping...")
break
match step["type"]:
# Support for scripts

View File

@ -8,6 +8,7 @@ import subprocess
from pathlib import Path
from typing import TYPE_CHECKING
from .resources_utils import get_path
from .. import project_path
from ..lxc import commands_utils
@ -169,28 +170,34 @@ def run_command_on_pve(command: str, return_status_code: bool = False, exception
username = config['pve']['user']
port = config['pve']['port']
# Run command on PVE via SSH and return output
# Log command for debugging
if "pct exec" in command:
logging.debug(f"Running command on LXC {command.split(' ')[2]} through PVE (ssh): {command}")
else:
logging.debug(f"Running command on PVE (ssh): {command}")
# catch errors code
command = subprocess.run(f'ssh {username}@{host} -p {port} "{command}"', shell=shell, capture_output=True,
command = subprocess.run(f'ssh -i {get_identity_file()} {username}@{host} -p {port} "{command}"', shell=shell, capture_output=True,
encoding="utf-8")
# If return code is not 0 and that exception_on_exit is True and return_status_code is False, throw an exception
if command.returncode != 0 and exception_on_exit and not return_status_code:
raise Exception(f"Error while running command on PVE: \n{command.stderr}")
raise Exception(f"Error while running command: \n{command.stderr}")
if return_status_code:
return command.returncode
# Check if stdout is empty, throw an exception or return empty string depending on exception_on_empty_stdout
if (command.stdout is None or command.stdout == "") and exception_on_empty_stdout:
raise Exception(
f"Error, no output from command on PVE, try using the command with return_status_code instead: \n{command.stderr}")
if command.stdout is None or command.stdout == "":
f"Error, no output from command, try using the command with return_status_code instead: \n{command.stderr}")
elif command.stdout is None or command.stdout == "":
return ""
# Decode stdout if it's bytes
if type(command.stdout) == bytes:
return command.stdout.decode().rstrip()
return command.stdout.rstrip()
@ -345,17 +352,45 @@ def get_remove_package_command(distribution: str):
raise Exception(f"Unsupported distribution: {distribution}")
def get_ssh_public_key():
def get_pve_ssh_public_key():
""" Retrieve the SSH public key of the machine running this script
Returns
-------
str
SSH public key
Raises
------
Exception
If an error occurs while retrieving the SSH public key (file not found, permission denied, etc.)
"""
# TODO: maybe implement custom path for the key
return run_command_on_pve(command="cat ~/.ssh/id_rsa.pub")
return run_command_on_pve(command="cat ~/.ssh/id_rsa.pub", exception_on_exit=True)
def get_identity_file():
""" Retrieve the SSH public key of the machine running this script
Returns
-------
str
Path to the SSH private key
Raises
------
Exception
If an error occurs while retrieving the SSH public key (file not found, permission denied, etc.)
"""
if get_config()['settings']['identity_file']:
return get_path(lxc=None, path=get_config()['settings']['identity_file'])
elif Path("~/.ssh/id_rsa").expanduser().is_file():
return Path("~/.ssh/id_rsa").expanduser()
else:
raise Exception("No identity file found")
def copy_file_to_pve(path: Path, destination: str):
@ -379,7 +414,7 @@ def copy_file_to_pve(path: Path, destination: str):
config = get_config()
# copy the file to the PVE from the local machine
run_command_locally(command=f"scp {str(path)} {config['pve']['user']}@{config['pve']['host']}:{destination}",
run_command_locally(command=f"scp -i {get_identity_file()} {str(path)} {config['pve']['user']}@{config['pve']['host']}:{destination}",
return_status_code=True)