proxmox utils, lxc utils and configs

This commit is contained in:
Mathieu Broillet 2023-06-09 14:51:47 +02:00
parent e6a27d0bef
commit c9af80fa86
6 changed files with 320 additions and 92 deletions

19
.drone.yml Normal file
View File

@ -0,0 +1,19 @@
kind: pipeline
type: docker
name: build_python_project
steps:
- name: build
image: python/alpine
commands:
- pip install -r requirements.txt
- python -m nuitka --onefile run.py --include-data-dir=./resources=resources --output-filename "ProxmoxDeploy${DRONE_TAG##v}.exe"
- name: gitea_release
image: plugins/gitea-release
settings:
api_key:
from_secret: api_key
base_url: https://git.broillet.ch
files: run.exe
when:
event: tag

1
MANIFEST.in Normal file
View File

@ -0,0 +1 @@
recursive-include resources *

View File

@ -1,8 +1,10 @@
{
"lxc_id": "0000",
"lxc_hostname": "test",
"os": "alpine",
"release": "3.17",
"os": {
"name": "alpine",
"release": "3.17"
},
"resources": {
"cpu": "1",
"memory": "512",
@ -22,5 +24,10 @@
"start_on_boot": "false",
"password": "qwertz1234",
"ssh": false
}
},
"creation": {
"type": "tteck",
"script": "https://git.broillet.ch/Mathieu/ProxmoxHelperScripts/src/branch/main/install/alpine-vaultwarden-install.sh"
},
"deploy": []
}

View File

@ -27,3 +27,6 @@ def run():
load_lxc(file.read())
print(get_all_lxcs())
# print(get_all_lxcs()[0].get_tteck_env_variables())
# get_all_lxcs()[0].create()

View File

@ -1,4 +1,7 @@
import json
import logging
from src.utils import proxmox_utils
lxcs = []
@ -11,14 +14,14 @@ def get_all_lxcs():
return lxcs
def get_lxc(id):
def get_lxc(lxc_id):
"""
Get LXC by ID
:param id: lxc id
:param lxc_id: lxc id
:return: lxc object
"""
for lxc in lxcs:
if lxc.get_lxc_id() == id:
if lxc.get_id() == lxc_id:
return lxc
return None
@ -37,33 +40,15 @@ def load_lxc(file):
# Extract values from JSON
lxc_id = data["lxc_id"]
lxc_hostname = data["lxc_hostname"]
disk_size = data["resources"]["disk"]
core_count = data["resources"]["cpu"]
ram_size = data["resources"]["memory"]
bridge = data["network"]["bridge"]
ipv4 = data["network"]["ipv4"]
gateway = data["network"]["gateway"]
disable_ipv6 = "yes" if data["network"]["ipv6"] == "" else "no"
mac = data["network"]["mac"]
vlan = data["network"]["vlan"]
ssh = "yes" if data["options"]["ssh"] else "no"
os = data["os"]
resources = data["resources"]
network = data["network"]
options = data["options"]
creation = data["creation"]
deploy = data["deploy"]
# Create LXC object
lxc = LXC(
lxc_id=lxc_id,
lxc_hostname=lxc_hostname,
disk_size=disk_size,
core_count=core_count,
ram_size=ram_size,
bridge=bridge,
ipv4=ipv4,
gateway=gateway,
disable_ipv6=disable_ipv6,
mac=mac,
vlan=vlan,
ssh=ssh
)
lxc = LXC(lxc_id, lxc_hostname, os, resources, network, options, creation, deploy)
lxcs.append(lxc)
@ -72,20 +57,36 @@ class LXC:
LXC class
"""
def __init__(self, lxc_id, lxc_hostname, disk_size, core_count, ram_size, bridge, ipv4, gateway, disable_ipv6, mac,
vlan, ssh):
def __init__(self, lxc_id, lxc_hostname, os, resources, network, options, creation, deploy):
self.lxc_id = lxc_id
self.lxc_hostname = lxc_hostname
self.disk_size = disk_size
self.core_count = core_count
self.ram_size = ram_size
self.bridge = bridge
self.ipv4 = ipv4
self.gateway = gateway
self.disable_ipv6 = disable_ipv6
self.mac = mac
self.vlan = vlan
self.ssh = ssh
self.os = os
self.os_name = os["name"]
self.os_release = os["release"]
self.resources = resources
self.cpu = resources["cpu"]
self.memory = resources["memory"]
self.disk = resources["disk"]
self.storage = resources["storage"]
self.network = network
self.bridge = network["bridge"]
self.ipv4 = network["ipv4"]
self.ipv6 = network["ipv6"]
self.mac = network["mac"]
self.gateway = network["gateway"]
self.vlan = network["vlan"]
self.options = options
self.privileged = options["privileged"]
self.start_on_boot = options["start_on_boot"]
self.password = options["password"]
self.ssh = options["ssh"]
self.creation = creation
self.deploy = deploy
def __str__(self):
return f"LXC {self.lxc_id} ({self.lxc_hostname})"
@ -99,81 +100,210 @@ class LXC:
def __hash__(self):
return hash(self.lxc_id)
def get_lxc_id(self):
def get_id(self):
"""
Get LXC ID
:return: lxc id
"""
return self.lxc_id
def get_lxc_hostname(self):
def get_hostname(self):
"""
Get LXC hostname
:return: lxc hostname
"""
return self.lxc_hostname
def get_disk_size(self):
return self.disk_size
def get_os(self):
"""
Get OS
:return: os
"""
return self.os
def get_core_count(self):
return self.core_count
def get_os_name(self):
"""
Get OS name
:return: os name
"""
return self.os_name
def get_ram_size(self):
return self.ram_size
def get_os_release(self):
"""
Get OS release
:return: os release
"""
return self.os_release
def get_resources(self):
"""
Get resources
:return: resources
"""
return self.resources
def get_cpu(self):
"""
Get CPU
:return: cpu
"""
return self.cpu
def get_memory(self):
"""
Get memory
:return: memory
"""
return self.memory
def get_disk(self):
"""
Get disk
:return: disk
"""
return self.disk
def get_storage(self):
"""
Get storage
:return: storage
"""
return self.storage
def get_network(self):
"""
Get network
:return: network
"""
return self.network
def get_bridge(self):
"""
Get bridge
:return: bridge
"""
return self.bridge
def get_ipv4(self):
"""
Get IPv4
:return: ipv4
"""
return self.ipv4
def get_gateway(self):
return self.gateway
def get_disable_ipv6(self):
return self.disable_ipv6
def get_ipv6(self):
"""
Get IPv6
:return: ipv6
"""
return self.ipv6
def get_mac(self):
"""
Get MAC
:return: mac
"""
return self.mac
def get_gateway(self):
"""
Get gateway
:return: gateway
"""
return self.gateway
def get_vlan(self):
"""
Get VLAN
:return: vlan
"""
return self.vlan
def get_ssh(self):
def get_options(self):
"""
Get options
:return: options
"""
return self.options
def is_privileged(self):
"""
Is privileged
:return: privileged
"""
return self.privileged
def is_start_on_boot(self):
"""
Is start on boot
:return: start on boot
"""
return self.start_on_boot
def get_password(self):
"""
Get password
:return: password
"""
return self.password
def is_ssh_enabled(self):
"""
Is SSH enabled
:return: ssh
"""
return self.ssh
def get_deploy(self):
"""
Get deployements
:return: deployements
"""
return self.deploy
def create(self):
"""
Create LXC
:return:
"""
if proxmox_utils.does_lxc_exist(self.lxc_id):
logging.info(f"LXC {self.lxc_id} already exists, skipping creation")
return
else:
logging.info(f"Creating LXC {self.lxc_id}")
if self.creation['type'] == "tteck":
proxmox_utils.execute_tteck_script(self.creation['script'], self.get_tteck_env_variables())
def deploy(self):
pass
def get_tteck_env_variables(self):
"""
Get TTECK environment variables to run scripts silently
:return: environment variables
"""
env_template = '''CT_TYPE="1"
PW=""
CT_ID={lxc_id}
HN={lxc_hostname}
DISK_SIZE="{disk_size}"
CORE_COUNT="{core_count}"
RAM_SIZE="{ram_size}"
BRG="{bridge}"
NET="{ipv4}"
GATE="{gateway}"
DISABLEIP6="{disable_ipv6}"
MTU=""
SD=""
NS=""
MAC="{mac}"
VLAN="{vlan}"
SSH="{ssh}"
VERB="no"'''
env_variables = {
"CT_TYPE": "1",
"PW": self.password,
"CT_ID": self.lxc_id,
"HN": self.lxc_hostname,
"DISK_SIZE": self.disk,
"CORE_COUNT": self.cpu,
"RAM_SIZE": self.memory,
"BRG": self.bridge,
"NET": self.ipv4,
"GATE": self.gateway,
"DISABLEIP6": "no",
"MTU": "",
"SD": "",
"NS": "",
"MAC": self.mac,
"VLAN": self.vlan,
"SSH": self.ssh,
"VERB": "no"
}
# Format the environment variables template
env_variables = env_template.format(
lxc_id=self.lxc_id,
lxc_hostname=self.lxc_hostname,
disk_size=self.disk_size,
core_count=self.core_count,
ram_size=self.ram_size,
bridge=self.bridge,
ipv4=self.ipv4,
gateway=self.gateway,
disable_ipv6=self.disable_ipv6,
mac=self.mac,
vlan=self.vlan,
ssh=self.ssh
)
return env_variables
env_command = " && ".join([f"export {name}=\"{value}\"" for name, value in env_variables.items()])
return env_command

View File

@ -0,0 +1,68 @@
import logging
import subprocess
def get_pve_version():
"""
Get PVE version
:return: pve version
"""
return run_command_on_pve("pveversion")
def get_pve_hostname():
"""
Get PVE hostname
:return: pve hostname
"""
return run_command_on_pve("hostname")
def does_lxc_exist(lxc_id):
"""
Check if LXC exists
:param lxc_id: lxc id
:return: does lxc exists
"""
# TODO: only check in VMID column
return lxc_id in run_command_on_pve(f"pct list {lxc_id}")
def does_qemu_vm_exist(vm_id):
"""
Check if QEMU VM exists
:param vm_id: vm id
:return: does qemu vm exists
"""
# TODO: only check in VMID column
return vm_id in run_command_on_pve(f"qm list {vm_id}")
def execute_tteck_script(script_url, env_variables):
"""
Execute TTECK script with already filled environment variables to run silently (non-interactive)
:param script_url: script url (github or other)
:param env_variables: list of environment variables
:return: status code
"""
env_variables = " ".join(env_variables)
run_command_on_pve(f"{env_variables} && bash -c \"$(wget -qLO - {script_url}\"")
def run_command_on_pve(command):
"""
Run command on PVE
:param command: command
:return: command
"""
# Run command and return output (not as bytes)
logging.debug(f"Running command on PVE: \n{command}")
return subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
encoding="utf-8").stdout.decode()