Running local scripts now works correctly and improved the varions way to run commands on both the PVE and the LXCs
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Mathieu Broillet 2023-06-14 11:29:38 +02:00
parent 2e70f64269
commit eb191a7a55
No known key found for this signature in database
GPG Key ID: 7D4F25BC50A0AA32
4 changed files with 216 additions and 120 deletions

View File

@ -105,7 +105,7 @@ class LXC:
if is_template_downloaded == "":
logging.info(f"Template {template_name} not found, downloading it...")
proxmox_utils.run_command_on_pve(command=f"pveam download local {template_name}", warn_exit_status=True)
proxmox_utils.run_command_on_pve(command=f"pveam download local {template_name}")
return f"local:vztmpl/{template_name}"
@ -284,21 +284,21 @@ class LXC:
return
else:
logging.info(f"Starting LXC {self.lxc_id}")
proxmox_utils.run_command_on_pve(command=f"pct start {self.lxc_id}", warn_exit_status=True)
proxmox_utils.run_command_on_pve(command=f"pct start {self.lxc_id}")
def stop(self):
"""
Stop LXC
"""
logging.info(f"Stopping LXC {self.lxc_id}")
proxmox_utils.run_command_on_pve(command=f"pct stop {self.lxc_id}", warn_exit_status=True)
proxmox_utils.run_command_on_pve(command=f"pct stop {self.lxc_id}")
def reboot(self):
"""
Reboot LXC
"""
logging.info(f"Rebooting LXC {self.lxc_id}")
proxmox_utils.run_command_on_pve(command=f"pct reboot {self.lxc_id}", warn_exit_status=True)
proxmox_utils.run_command_on_pve(command=f"pct reboot {self.lxc_id}")
def create(self):
"""
@ -306,10 +306,10 @@ class LXC:
"""
if self.does_exist():
logging.info(f"LXC {self.lxc_id} already exists, skipping creation")
proxmox_utils.run_command_on_pve(command=self.get_pct_command(create=False), warn_exit_status=True)
proxmox_utils.run_command_on_pve(command=self.get_pct_command(create=False))
else:
logging.info(f"Creating LXC {self.lxc_id}")
proxmox_utils.run_command_on_pve(command=self.get_pct_command(create=True), warn_exit_status=True)
proxmox_utils.run_command_on_pve(command=self.get_pct_command(create=True))
self.start()
@ -343,9 +343,9 @@ class LXC:
:return: boolean
"""
if type(program) == str:
return self.run_command("which " + program, only_code=True) == 0
return self.run_command("which " + program, return_status_code=True) == 0
elif type(program) == list:
return all((self.run_command("which " + p, only_code=True) == 0) for p in program)
return all((self.run_command("which " + p, return_status_code=True) == 0) for p in program)
def run_script(self, script_path):
"""
@ -356,7 +356,10 @@ class LXC:
"""
return commands_utils.run_script(self, {"lxc_path": script_path})
def run_command(self, command, ssh=False, warn_exit_status=False, only_code=False, working_directory=None):
def run_command(self, command: str, ssh: bool = False, return_status_code: bool = False,
exception_on_exit: bool = False,
exception_on_empty_stdout: bool = False,
working_directory: str = None, shell: bool = False):
"""
Run command on LXC
:param command: command to run
@ -370,10 +373,16 @@ class LXC:
if ssh:
return proxmox_utils.run_command_ssh(command=command, host=self.get_ipv4(), username="root", port=22,
warn_exit_status=warn_exit_status, only_code=only_code)
return_status_code=return_status_code,
exception_on_exit=exception_on_exit,
exception_on_empty_stdout=exception_on_empty_stdout,
shell=shell)
else:
return proxmox_utils.run_command_on_pve(command=f"pct exec {self.lxc_id} -- {command}",
warn_exit_status=warn_exit_status, only_code=only_code)
return_status_code=return_status_code,
exception_on_exit=exception_on_exit,
exception_on_empty_stdout=exception_on_empty_stdout,
shell=shell)
def get_pct_command(self, create=True):
"""

View File

@ -54,11 +54,11 @@ def _run_script_on_lxc(lxc: LXC, path: Path):
Path to the script inside the LXC storage
"""
lxc.run_command(f"bash {str(path)}")
lxc.run_command(f"bash {str(path.as_posix())}", shell=True)
def _run_local_script_on_lxc(lxc: LXC, path: Path):
"""Runs a script present on the machine running this program, inside the LXC
"""Runs a script present on the local machine running this program, inside the LXC
It works by copying the script from the local machine to the PVE using SCP and
then using "pct push" to send it to the LXC.
@ -70,9 +70,17 @@ def _run_local_script_on_lxc(lxc: LXC, path: Path):
Path object to the script on the local machine (machine running this program)
"""
# Copy the script to /tmp and run it from there
proxmox_utils.copy_file_to_lxc(lxc.get_id(), path, "/tmp/pdj-scripts/")
lxc.run_command(f"bash /tmp/pdj-scripts/{path.name}")
# define the path of the script inside the LXC
script_path_in_lxc = Path("/tmp/pdj-scripts/").joinpath(path.name)
# Copy the script to the LXC
proxmox_utils.copy_file_to_lxc(lxc, path, f"{str(script_path_in_lxc.parent.as_posix())}/")
# and run it from there
_run_script_on_lxc(lxc, script_path_in_lxc)
# once it has been run, remove the script from the LXC
remove_file_in_lxc(lxc, script_path_in_lxc)
# with open(path, "r") as file:
# script = file.read()
@ -95,10 +103,10 @@ def _run_remote_script_on_lxc(lxc: LXC, url: str):
"""
if lxc.has_program("curl"):
lxc.run_command(f"curl -sSL {url} | bash")
lxc.run_command(f"curl -sSL {url} | bash", shell=True)
def create_folder_in_lxc(lxc: LXC, path: str, permission=755):
def create_folder_in_lxc(lxc: LXC, path: str or Path, permission=755):
"""Create a folder in the LXC
It's possible to specify the permission of the folder, by default it uses 755
@ -106,19 +114,39 @@ def create_folder_in_lxc(lxc: LXC, path: str, permission=755):
----------
lxc: LXC
The LXC object of the LXC to create the folder in
path: str
path: str or pathlib.Path
Path to the folder to create in the LXC
permission: int, optional
Permission of the folder to create in the LXC (default is 755)
"""
lxc.run_command(f"mkdir -p {path}")
if isinstance(path, Path):
path = str(path.as_posix())
lxc.run_command(f"mkdir -p {path}", return_status_code=True)
if permission != 755:
lxc.run_command(f"chmod -R {permission} {path}")
lxc.run_command(f"chmod -R {permission} {path}", return_status_code=True)
def create_file_in_lxc(lxc: LXC, path: str, permission=644):
def remove_folder_in_lxc(lxc: LXC, path: str or Path):
"""Remove a folder in the LXC
Parameters
----------
lxc: LXC
The LXC object of the LXC to remove the folder in
path: str or pathlib.Path
Path to the folder to remove in the LXC
"""
if isinstance(path, Path):
path = str(path.as_posix())
lxc.run_command(f"rm -rf {path}", return_status_code=True)
def create_file_in_lxc(lxc: LXC, path: str or Path, permission=644):
"""Create a file in the LXC
It's possible to specify the permission of the file, by default it uses 644
@ -126,29 +154,35 @@ def create_file_in_lxc(lxc: LXC, path: str, permission=644):
----------
lxc: LXC
The LXC object of the LXC to create the file in
path: str
path: str or pathlib.Path
Path to the file to create in the LXC
permission: int, optional
Permission of the file to create in the LXC (default is 644)
"""
lxc.run_command(f"touch {path}")
if isinstance(path, Path):
path = str(path.as_posix())
lxc.run_command(f"touch {path}", return_status_code=True)
if permission != 644:
lxc.run_command(f"chmod {permission} {path}")
lxc.run_command(f"chmod {permission} {path}", return_status_code=True)
def remove_file_in_lxc(lxc: LXC, path: str):
def remove_file_in_lxc(lxc: LXC, path: str or Path):
"""Remove a file in the LXC
Parameters
----------
lxc: LXC
The LXC object of the LXC to remove the file in
path: str
path: str or pathlib.Path
Path to the file to remove in the LXC
"""
lxc.run_command(f"rm {path}")
if isinstance(path, Path):
path = str(path.as_posix())
lxc.run_command(f"rm {path}", return_status_code=True)
def copy_local_file_to_lxc(lxc: LXC, path: Path, destination: str):
@ -164,7 +198,8 @@ def copy_local_file_to_lxc(lxc: LXC, path: Path, destination: str):
Path to the destination in the LXC
"""
proxmox_utils.run_command_locally(f"scp {str(path)} {lxc.get_ssh_string()}:{destination}")
proxmox_utils.run_command_locally(command=f"scp -B {str(path)} {lxc.get_ssh_string()}:{destination}",
return_status_code=True)
def copy_local_folder_to_lxc(lxc: LXC, path: Path, destination: str):
@ -180,7 +215,8 @@ def copy_local_folder_to_lxc(lxc: LXC, path: Path, destination: str):
Path to the destination in the LXC
"""
proxmox_utils.run_command_locally(f"scp -r {str(path)} {lxc.get_ssh_string()}:{destination}")
proxmox_utils.run_command_locally(command=f"scp -B -r {str(path)} {lxc.get_ssh_string()}:{destination}",
return_status_code=True)
def run_docker_command(lxc: LXC, container: str, command: str):
@ -200,7 +236,7 @@ def run_docker_command(lxc: LXC, container: str, command: str):
>>> run_docker_command(lxc, "<container-name>", "<command>")
"""
lxc.run_command(f"docker exec -it {container} {command}")
lxc.run_command(f"docker exec -it {container} {command}", exception_on_empty_stdout=False)
def run_docker_compose_command(lxc: LXC, command: str, working_directory: str = None):
@ -225,9 +261,9 @@ def run_docker_compose_command(lxc: LXC, command: str, working_directory: str =
docker_compose_exec = "docker compose"
if working_directory is not None:
lxc.run_command(f"cd {working_directory} && {docker_compose_exec} {command}")
lxc.run_command(f"cd {working_directory} && {docker_compose_exec} {command}", return_status_code=True)
else:
lxc.run_command(f"{docker_compose_exec} {command}")
lxc.run_command(f"{docker_compose_exec} {command}", return_status_code=True)
def download_file(lxc: LXC, url: str, destination: str):
@ -251,9 +287,9 @@ def download_file(lxc: LXC, url: str, destination: str):
"""Download a file from a URL to the LXC"""
if type(url) is list:
for u in url:
lxc.run_command(f"wget {u} --directory-prefix={destination}")
lxc.run_command(f"wget {u} --directory-prefix={destination}", return_status_code=True)
else:
lxc.run_command(f"wget {url} --directory-prefix={destination}")
lxc.run_command(f"wget {url} --directory-prefix={destination}", return_status_code=True)
def unzip_file(lxc: LXC, path: str, destination: str = None):
@ -280,9 +316,9 @@ def unzip_file(lxc: LXC, path: str, destination: str = None):
destination = Path(path).parent
if ".zip" in path:
lxc.run_command(f"unzip {path} -d {destination}")
lxc.run_command(f"unzip {path} -d {destination}", return_status_code=True)
elif ".tar.gz" in path:
lxc.run_command(f"mkdir -p {destination} && tar -xzf {path} --directory {destination}")
lxc.run_command(f"mkdir -p {destination} && tar -xzf {path} --directory {destination}", return_status_code=True)
def install_package(lxc: LXC, package: str or list):
@ -303,9 +339,11 @@ def install_package(lxc: LXC, package: str or list):
if type(package) is list:
for p in package:
lxc.run_command(f"{proxmox_utils.get_install_package_command(lxc.get_os_name())} {p}")
lxc.run_command(f"{proxmox_utils.get_install_package_command(lxc.get_os_name())} {p}",
return_status_code=True)
else:
lxc.run_command(f"{proxmox_utils.get_install_package_command(lxc.get_os_name())} {package}")
lxc.run_command(f"{proxmox_utils.get_install_package_command(lxc.get_os_name())} {package}",
return_status_code=True)
def remove_package(lxc: LXC, package: str or list):
@ -328,10 +366,12 @@ def remove_package(lxc: LXC, package: str or list):
packages = []
for p in package:
packages.append(p)
lxc.run_command(f"{proxmox_utils.get_remove_package_command(lxc.get_os_name())} {' '.join(packages)}")
lxc.run_command(f"{proxmox_utils.get_remove_package_command(lxc.get_os_name())} {' '.join(packages)}",
return_status_code=True)
else:
lxc.run_command(f"{proxmox_utils.get_remove_package_command(lxc.get_os_name())} {package}")
lxc.run_command(f"{proxmox_utils.get_remove_package_command(lxc.get_os_name())} {package}",
return_status_code=True)
def replace_in_files(lxc: LXC, path: str or list, search: str, replace: str, case_sensitive: bool = False):
@ -360,6 +400,8 @@ def replace_in_files(lxc: LXC, path: str or list, search: str, replace: str, cas
if type(path) is list:
for p in path:
lxc.run_command(f"sed {'-i' if case_sensitive else ''} 's/{search}/{replace}/g' {p}")
lxc.run_command(f"sed {'-i' if case_sensitive else ''} 's/{search}/{replace}/g' {p}",
return_status_code=True)
else:
lxc.run_command(f"sed {'-i' if case_sensitive else ''} 's/{search}/{replace}/g' {path}")
lxc.run_command(f"sed {'-i' if case_sensitive else ''} 's/{search}/{replace}/g' {path}",
return_status_code=True)

View File

@ -40,7 +40,7 @@ def are_all_conditions_met(lxc: LXC):
for folder in conditions['folders']:
result.append(Path(folder).is_dir())
for program in conditions['programs']:
result.append(lxc.run_command("which " + program, only_code=True) == 0)
result.append(lxc.run_command("which " + program, return_status_code=True) == 0)
if all(result):
logging.info(f"All creations conditions met for LXC {lxc.lxc_id}, running deploy steps...")
@ -99,14 +99,15 @@ def run_steps(lxc: LXC):
case "folder_copy":
commands_utils.copy_local_folder_to_lxc(lxc, get_path(lxc, step["path"]), step["destination"])
case "command":
lxc.run_command(command=step["command"], working_directory=optional(step["working_directory"], None))
lxc.run_command(command=step["command"], working_directory=optional(step["working_directory"], None),
return_status_code=True)
case "docker":
commands_utils.run_docker_command(lxc, step["container"], step["command"])
case "docker_compose":
commands_utils.run_docker_compose_command(lxc, step["command"],
optional(step["working_directory"], None))
case "git":
lxc.run_command(command=f"git clone {step['url']} {step['destination']}")
lxc.run_command(command=f"git clone {step['url']} {step['destination']}", return_status_code=True)
case "download":
commands_utils.download_file(lxc, step["url"], step["destination"])
case "unzip":

View File

@ -1,11 +1,18 @@
from __future__ import annotations
import json
import logging
import os
import pathlib
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING
from .. import project_path
from ..lxc import commands_utils
if TYPE_CHECKING:
from . import LXC
def get_pve_version():
@ -16,7 +23,7 @@ def get_pve_version():
str
PVE version
"""
return run_command_on_pve(command="pveversion", warn_exit_status=True)
return run_command_on_pve(command="pveversion")
def get_pve_hostname():
@ -28,7 +35,7 @@ def get_pve_hostname():
PVE hostname
"""
return run_command_on_pve(command="hostname", warn_exit_status=True)
return run_command_on_pve(command="hostname")
def does_lxc_exist(lxc_id: int):
@ -45,7 +52,7 @@ def does_lxc_exist(lxc_id: int):
"""
# TODO: only check in VMID column
return str(lxc_id) in run_command_on_pve(command=f"pct list | awk '/{lxc_id}/'")
return str(lxc_id) in run_command_on_pve(command=f"pct list | awk '/{lxc_id}/'", exception_on_empty_stdout=False)
def does_qemu_vm_exist(vm_id: int):
@ -62,28 +69,28 @@ def does_qemu_vm_exist(vm_id: int):
"""
# TODO: only check in VMID column
return str(vm_id) in run_command_on_pve(command=f"qm list {vm_id}")
return str(vm_id) in run_command_on_pve(command=f"qm list {vm_id}", exception_on_empty_stdout=False)
def execute_tteck_script(script_url: str, env_variables: list):
"""Execute TTECK script with already filled environment variables to run silently (non-interactive)
Parameters
----------
script_url: str
script url (github or other)
env_variables: list
list of environment variables
Returns
-------
int
status code
"""
env_variables = " ".join(env_variables)
run_command_on_pve(command=f"{env_variables} && bash -c \"$(wget -qLO - {script_url}\"", warn_exit_status=True)
# def execute_tteck_script(script_url: str, env_variables: list):
# """Execute TTECK script with already filled environment variables to run silently (non-interactive)
#
# Parameters
# ----------
# script_url: str
# script url (github or other)
# env_variables: list
# list of environment variables
#
# Returns
# -------
# int
# status code
# """
#
# env_variables = " ".join(env_variables)
#
# run_command_on_pve(command=f"{env_variables} && bash -c \"$(wget -qLO - {script_url}\"")
def get_config():
@ -100,54 +107,62 @@ def get_config():
return json.loads(file.read())
def run_command_on_pve(command: str, warn_exit_status: bool = False, only_code: bool = False, local: bool = False):
"""Run command on the Proxmox VE host
def run_command_on_pve(command: str, return_status_code: bool = False, exception_on_exit: bool = True,
exception_on_empty_stdout: bool = False,
force_local: bool = False, shell: bool = False):
"""Run a command on the Proxmox VE host
The default behavior is as follows :
- runs the command on the Proxmox VE host
- throws an exception if the exit status is not 0
- returns the stdout of the command
-- if the stdout is None or empty, throws an exception
If you don't except an output or depend on the status code returned by the command, you can set return_status_code to True:
- runs the command on the Proxmox VE host
- returns the status code of the command
Parameters
----------
command: str
command to run
warn_exit_status: bool, optional
should an exception be thrown if the exit status is not 0
only_code: bool, optional
should it only return the exit code and not the stdout
local: bool, optional
return_status_code: bool, optional
should it return the exit code and not the stdout, disables exception_on_exit
exception_on_exit: bool, optional
should an exception be thrown if the exit status code is not 0
even though it's True by default, if you use return_status_code, it is disabled
exception_on_empty_stdout: bool, optional
should an exception be thrown if the stdout is None or empty
force_local: bool, optional
should it be run locally not via ssh even with local mode off in the pve section of config.json
shell: bool, optional
should the command be run in a shell or not, see python subprocess documentation for more informations
Returns
-------
str
command output by default
int
command exit code if only_code is True
command exit code if return_status_code is True
Raises
------
Exception
if the exit status is not 0 and warn_exit_status is True
if the exit status is not 0 and exception_on_exit is True
Exception
if the stdout is None and return_status_code is False
Exception
if the stdout is empty and exception_on_empty_stdout is True
"""
# Get config
config = get_config()
# Check if PVE is local or remote
if config['pve']['local'] or local:
if config['pve']['local'] or force_local:
# Run command and return output (not as bytes)
logging.debug(f"Running command on PVE/Machine running this script (locally): {command}")
command = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
encoding="utf-8")
if command.returncode != 0 and warn_exit_status:
logging.error(f"Error while running command on PVE: \n{command.stderr}")
raise Exception(f"Error while running command on PVE: \n{command.stderr}")
if only_code:
return command.returncode
return command.stdout.decode().rstrip()
command = subprocess.run(command, shell=shell, capture_output=True, encoding="utf-8")
else:
host = config['pve']['host']
@ -158,20 +173,28 @@ def run_command_on_pve(command: str, warn_exit_status: bool = False, only_code:
logging.debug(f"Running command on PVE (ssh): {command}")
# catch errors code
command = subprocess.run(f'ssh {username}@{host} -p {port} "{command}"', shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
command = subprocess.run(f'ssh {username}@{host} -p {port} "{command}"', shell=shell, capture_output=True,
encoding="utf-8")
if command.returncode != 0 and warn_exit_status:
logging.error(f"Error while running command on PVE: \n{command.stderr}")
raise Exception(f"Error while running command on PVE: \n{command.stderr}")
if command.returncode != 0 and exception_on_exit and not return_status_code:
raise Exception(f"Error while running command on PVE: \n{command.stderr}")
if only_code:
return command.returncode
if return_status_code:
return command.returncode
return command.stdout.rstrip()
if (command.stdout is None or command.stdout == "") and exception_on_empty_stdout:
raise Exception(
f"Error, no output from command on PVE, try using the command with return_status_code instead: \n{command.stderr}")
if command.stdout is None or command.stdout == "":
return ""
if type(command.stdout) == bytes:
return command.stdout.decode().rstrip()
return command.stdout.rstrip()
def run_command_locally(command: str, warn_exit_status: bool = False, only_code: bool = False):
def run_command_locally(command: str, return_status_code: bool = False, exception_on_exit: bool = False):
"""Force running command locally even if local mode is off in config.json
See Also
@ -179,11 +202,12 @@ def run_command_locally(command: str, warn_exit_status: bool = False, only_code:
run_command_on_pve
"""
run_command_on_pve(command=command, warn_exit_status=warn_exit_status, only_code=only_code, local=True)
run_command_on_pve(command=command, exception_on_exit=exception_on_exit, return_status_code=return_status_code,
force_local=True)
def run_command_ssh(command: str, host: str, username: str, port: int = 22, warn_exit_status: bool = False,
only_code: bool = False):
def run_command_ssh(command: str, host: str, username: str, port: int = 22, return_status_code: bool = False,
exception_on_exit: bool = False, exception_on_empty_stdout: bool = False, shell: bool = False):
"""Run command on a remote host via SSH
Parameters
@ -196,17 +220,21 @@ def run_command_ssh(command: str, host: str, username: str, port: int = 22, warn
username to use to connect to the host (usually root for lxc and vm)
port: int, optional
port to use to connect to the host (default: 22)
warn_exit_status: bool, optional
exception_on_exit: bool, optional
should an exception be thrown if the exit status is not 0
only_code: bool, optional
exception_on_empty_stdout: bool, optional
should an exception be thrown if the stdout is empty or none
return_status_code: bool, optional
should it only return the exit code and not the stdout
shell: bool, optional
should the command be run in a shell (default: True)
Returns
-------
str
command output by default
int
command exit code if only_code is True
command exit code if return_status_code is True
"""
@ -214,13 +242,12 @@ def run_command_ssh(command: str, host: str, username: str, port: int = 22, warn
# catch errors code
command = subprocess.run(f'ssh {username}@{host} -p {port} "{command}"', shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
capture_output=True, encoding="utf-8")
if command.returncode != 0 and warn_exit_status:
logging.error(f"Error while running command on PVE: \n{command.stderr}")
if command.returncode != 0 and exception_on_exit:
raise Exception(f"Error while running command on PVE: \n{command.stderr}")
if only_code:
if return_status_code:
return command.returncode
return command.stdout.rstrip()
@ -328,7 +355,7 @@ def get_ssh_public_key():
"""
# TODO: maybe implement custom path for the key
return run_command_on_pve(command="cat ~/.ssh/id_rsa.pub", warn_exit_status=True)
return run_command_on_pve(command="cat ~/.ssh/id_rsa.pub")
def copy_file_to_pve(path: Path, destination: str):
@ -346,23 +373,29 @@ def copy_file_to_pve(path: Path, destination: str):
>>> copy_file_to_pve(Path("/home/user/file.txt"), "/root/file.txt")
"""
# create the destination directory if it does not exist
run_command_on_pve(command=f"mkdir -p {destination.rsplit('/', 1)[0]}", return_status_code=True)
config = get_config()
run_command_locally(f"scp {str(path)} {config['pve']['user']}@{config['pve']['host']}:{destination}")
# copy the file to the PVE from the local machine
run_command_locally(command=f"scp {str(path)} {config['pve']['user']}@{config['pve']['host']}:{destination}",
return_status_code=True)
def copy_file_to_lxc(lxc_id: int, path: Path, destination: str):
"""Copy a local file (on the machine running this script) to the PVE and finally to the LXC
It works by copying the script from the local machine to the PVE using SCP and
def copy_file_to_lxc(lxc: LXC, path: Path, destination: str or Path):
"""Copy a local file (on the machine running this program) to the PVE and finally to the LXC
It works by copying the file from the local machine to the PVE using SCP and
then using "pct push" to send it to the LXC.
Sometimes SSH is not installed on freshs LXC, so by using pct push we are sure that it will work.
Parameters
----------
lxc_id: int
LXC ID
lxc: LXC
LXC object to copy the file to
path: pathlib.Path
Path object to file on local machine
destination: str
destination: str or pathlib.Path
Destination on LXC
Examples
@ -370,8 +403,19 @@ def copy_file_to_lxc(lxc_id: int, path: Path, destination: str):
>>> copy_file_to_lxc(100, Path("/home/user/file.txt"), "/root/")
"""
# define path for temp file on pve
file = path.name
temp_path = f"/tmp/proxmoxdeployscripts/{file}"
temp_path = f"/tmp/pdj-temp/{file}"
# convert destination to string if it is a Path object
if isinstance(destination, Path):
destination = str(destination.as_posix())
# copy file to pve from local machine
copy_file_to_pve(path, temp_path)
run_command_on_pve(f"pct push {lxc_id} {temp_path} {destination}")
# make sure to create destination folder on lxc
commands_utils.create_folder_in_lxc(lxc, destination)
# copy/push file from pve to lxc
run_command_on_pve(f"pct push {lxc.get_id()} {temp_path} {destination}{file}", return_status_code=True)