Compare commits

..

No commits in common. "main" and "v1.0.0" have entirely different histories.
main ... v1.0.0

34 changed files with 992 additions and 1937 deletions

View File

@ -4,16 +4,12 @@ name: build_python_project
steps:
- name: build
#image: python:3.11-bullseye
image: git.broillet.ch/mathieu/python3.11-bullseye-buildessential:1.0 # python3.11-bullseye image with build-essential & patchelf (apt) and nuitka(pip) installed
image: python:alpine
commands:
- pip install -r requirements.txt
# - apk add build-base patchelf # not needed with custom image
# - apt update -y # not needed with custom image
# - apt install build-essential patchelf -y # not needed with custom image
# - pip install nuitka # not needed with custom image
# - python -m nuitka --onefile run.py --include-data-dir=./resources=resources --output-filename="ProxmoxDeploy${DRONE_TAG##v}" # not needed with new system with repo
- python -m nuitka --onefile run.py --output-filename="ProxmoxDeploy${DRONE_TAG##v}"
- apk add build-base patchelf
- pip install nuitka
- python -m nuitka --onefile run.py --include-data-dir=./resources=resources --output-filename="ProxmoxDeploy${DRONE_TAG##v}"
- name: gitea_release
image: plugins/gitea-release
settings:

Binary file not shown.

Before

Width:  |  Height:  |  Size: 90 KiB

364
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,364 @@
# Created by https://www.toptal.com/developers/gitignore/api/python,intellij,jetbrains
# Edit at https://www.toptal.com/developers/gitignore?templates=python,intellij,jetbrains
### Intellij ###
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# AWS User-specific
.idea/**/aws.xml
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/artifacts
# .idea/compiler.xml
# .idea/jarRepositories.xml
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# SonarLint plugin
.idea/sonarlint/
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
### Intellij Patch ###
# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721
# *.iml
# modules.xml
# .idea/misc.xml
# *.ipr
# Sonarlint plugin
# https://plugins.jetbrains.com/plugin/7973-sonarlint
.idea/**/sonarlint/
# SonarQube Plugin
# https://plugins.jetbrains.com/plugin/7238-sonarqube-community-plugin
.idea/**/sonarIssues.xml
# Markdown Navigator plugin
# https://plugins.jetbrains.com/plugin/7896-markdown-navigator-enhanced
.idea/**/markdown-navigator.xml
.idea/**/markdown-navigator-enh.xml
.idea/**/markdown-navigator/
# Cache file creation bug
# See https://youtrack.jetbrains.com/issue/JBR-2257
.idea/$CACHE_FILE$
# CodeStream plugin
# https://plugins.jetbrains.com/plugin/12206-codestream
.idea/codestream.xml
# Azure Toolkit for IntelliJ plugin
# https://plugins.jetbrains.com/plugin/8053-azure-toolkit-for-intellij
.idea/**/azureSettings.xml
### JetBrains ###
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
# AWS User-specific
# Generated files
# Sensitive or high-churn files
# Gradle
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/artifacts
# .idea/compiler.xml
# .idea/jarRepositories.xml
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
# Mongo Explorer plugin
# File-based project format
# IntelliJ
# mpeltonen/sbt-idea plugin
# JIRA plugin
# Cursive Clojure plugin
# SonarLint plugin
# Crashlytics plugin (for Android Studio and IntelliJ)
# Editor-based Rest Client
# Android studio 3.1+ serialized cache file
### JetBrains Patch ###
# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721
# *.iml
# modules.xml
# .idea/misc.xml
# *.ipr
# Sonarlint plugin
# https://plugins.jetbrains.com/plugin/7973-sonarlint
# SonarQube Plugin
# https://plugins.jetbrains.com/plugin/7238-sonarqube-community-plugin
# Markdown Navigator plugin
# https://plugins.jetbrains.com/plugin/7896-markdown-navigator-enhanced
# Cache file creation bug
# See https://youtrack.jetbrains.com/issue/JBR-2257
# CodeStream plugin
# https://plugins.jetbrains.com/plugin/12206-codestream
# Azure Toolkit for IntelliJ plugin
# https://plugins.jetbrains.com/plugin/8053-azure-toolkit-for-intellij
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
### Python Patch ###
# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
poetry.toml
# ruff
.ruff_cache/
# LSP config files
pyrightconfig.json
# End of https://www.toptal.com/developers/gitignore/api/python,intellij,jetbrains

10
.idea/ProxmoxDeploy.iml Normal file
View File

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -0,0 +1,12 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N802" />
</list>
</option>
</inspection_tool>
</profile>
</component>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

4
.idea/misc.xml Normal file
View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11 (ProxmoxDeploy)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/ProxmoxDeploy.iml" filepath="$PROJECT_DIR$/.idea/ProxmoxDeploy.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

148
.idea/workspace.xml Normal file
View File

@ -0,0 +1,148 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ChangeListManager">
<list default="true" id="ef90a940-975e-45ac-b0cb-e18c5b09ff29" name="Changes" comment="base">
<change afterPath="$PROJECT_DIR$/README.md" afterDir="false" />
<change afterPath="$PROJECT_DIR$/requirements.txt" afterDir="false" />
<change afterPath="$PROJECT_DIR$/run.py" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/get_path_file.py" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/main.py" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/utils/__init__.py" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/utils/lxc_utils.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/main.py" beforeDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="FileTemplateManagerImpl">
<option name="RECENT_TEMPLATES">
<list>
<option value="Python Script" />
</list>
</option>
</component>
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
<component name="MarkdownSettingsMigration">
<option name="stateVersion" value="1" />
</component>
<component name="ProjectId" id="2QxREeVLT4IlTxqXEpzPYhB1R0E" />
<component name="ProjectLevelVcsManager" settingsEditedManually="true">
<ConfirmationsSetting value="2" id="Add" />
</component>
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent">{
&quot;keyToString&quot;: {
&quot;RunOnceActivity.OpenProjectViewOnStart&quot;: &quot;true&quot;,
&quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;,
&quot;WebServerToolWindowFactoryState&quot;: &quot;false&quot;,
&quot;settings.editor.selected.configurable&quot;: &quot;preferences.pluginManager&quot;
}
}</component>
<component name="RecentsManager">
<key name="MoveFile.RECENT_KEYS">
<recent name="C:\Users\lmbbrm3\PycharmProjects\ProxmoxDeploy\resources\lxc" />
</key>
</component>
<component name="RunManager" selected="Python.run">
<configuration name="main" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<module name="ProxmoxDeploy" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/main.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="run" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="ProxmoxDeploy" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/run.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<recent_temporary>
<list>
<item itemvalue="Python.run" />
</list>
</recent_temporary>
</component>
<component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="ef90a940-975e-45ac-b0cb-e18c5b09ff29" name="Changes" comment="" />
<created>1686293135999</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1686293135999</updated>
<workItem from="1686293144782" duration="176000" />
<workItem from="1686293341823" duration="6751000" />
</task>
<task id="LOCAL-00001" summary="base">
<created>1686293500295</created>
<option name="number" value="00001" />
<option name="presentableId" value="LOCAL-00001" />
<option name="project" value="LOCAL" />
<updated>1686293500295</updated>
</task>
<option name="localTasksCounter" value="2" />
<servers />
</component>
<component name="TypeScriptGeneratedFilesManager">
<option name="version" value="3" />
</component>
<component name="Vcs.Log.Tabs.Properties">
<option name="TAB_STATES">
<map>
<entry key="MAIN">
<value>
<State />
</value>
</entry>
</map>
</option>
</component>
<component name="VcsManagerConfiguration">
<MESSAGE value="base" />
<option name="LAST_COMMIT_MESSAGE" value="base" />
</component>
<component name="com.intellij.coverage.CoverageDataManagerImpl">
<SUITE FILE_PATH="coverage/ProxmoxDeploy$main.coverage" NAME="main Coverage Results" MODIFIED="1686295586466" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$" />
<SUITE FILE_PATH="coverage/ProxmoxDeploy$run.coverage" NAME="run Coverage Results" MODIFIED="1686304048372" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$" />
</component>
</project>

View File

@ -1,5 +0,0 @@
FROM python:3.11-bullseye
RUN apt-get update
RUN apt-get install build-essential patchelf
RUN pip install nuitka

View File

@ -1,51 +0,0 @@
# Proxmox Deploy
[![Build Status](https://drone.broillet.ch/api/badges/mathieu/ProxmoxDeploy/status.svg)](https://drone.broillet.ch/mathieu/ProxmoxDeploy)
![Logo](./.git-images/logo.png)
## Description
Promox Deploy is a Python-based software that allows you to manage your Proxmox homelab by using YAML files.
This project started as I discovered another incredible project, "Proxmox VE Helper Scripts" by tteck. It allowed me to easily create and deploy apps using the LXC feature of Proxmox, allowing for isolation at a near-zero cost in perfomance.
Soon after that, I started having to maintain a multitude of ssh hosts and keys to access all my LXC easily, furthermore it made managing my LXCs using VSCode and the VSCode agent (with the remote SSH extension) really complicated and not easily maintainable.
So I created ProxmoxDeploy, I took a lot of inspiration from the scripts of tteck, again thank you for your project.
## Why?
As my homelab was growing I realised that it was harder and harder to keep everything in sync and up to date.
So I decided to create a script to manage my Proxmox homelab.
# How it works
The concept is simple, you have a Git repository with a the following structure:
```
.
├── config.json
├── lxc
│ ├── <id>
│ │ ├── config.json
│ │ ├── <your files>
│ │ └── <your folders>
│ └── <id>
│ ├── ...
│── qemu
│ ├── <id>
│ │ ├── config.json
│ │ ├── <your files>
│ │ └── <your folders>
│ └── <id>
│ ├── ...
│── scripts
│ ├── <your scripts>
│ └── ...
```
*See the wiki for more information about the structure and differents files*
Now ideally you would have some sort of Git actions (GitHub Actions/Drone/Gitea Actions) that automatically runs the latest binary of PD (ProxmoxDeploy) with your repo specified, so that for every new commits, the update is automatically deployed to your homelab. See the automatic deployement page on the wiki for more info.
# Usage
## Download
Download the pre-compiled binaries from the release page or [build it yourself](https://git.broillet.ch/mathieu/ProxmoxDeploy/wiki/Build-it-yourself).
# Documentation
Have a look at the [wiki](https://git.broillet.ch/mathieu/ProxmoxDeploy/wiki).

View File

@ -1,66 +0,0 @@
#!/bin/bash
# check if ssh is installed
if ! command -v ssh &>/dev/null; then
echo "SSH is not installed"
else
echo "SSH is already installed"
exit 1
fi
if lsb_release -a 2>/dev/null | grep -q -E "Debian|Ubuntu"; then
echo "Running Debian or Ubuntu"
apt-get install openssh-server
systemctl start sshd
elif cat /etc/os-release 2>/dev/null | grep -q -i "alpine"; then
echo "Running Alpine"
# Stop and remove Dropbear
rc-service dropbear stop
rc-update del dropbear
apk del dropbear* -f
# Stop and remove SSHD
rc-service sshd stop
rc-update del sshd
apk del openssh* -f
# Clean up Dropbear and SSH configurations
rm -rf /etc/dropbear
rm -rf /etc/ssh
rm /etc/init.d/ssh
# Reboot now if needed
# reboot now
# Install OpenSSH and necessary packages
apk add openssh gcompat libstdc++ curl bash git grep
apk add procps --no-cache
# Start and add SSHD to startup
rc-service sshd start
rc-update add sshd
# Update AllowTcpForwarding setting
sed -i 's/^#*AllowTcpForwarding.*/AllowTcpForwarding yes/' /etc/ssh/sshd_config
# Update PermitTunnel setting
sed -i 's/^#*PermitTunnel.*/PermitTunnel yes/' /etc/ssh/sshd_config
# Uncomment the line if needed (remove '#' at the beginning)
sed -i 's/^#PermitTunnel.*/PermitTunnel yes/' /etc/ssh/sshd_config
# Restart the SSH service to apply the changes
/etc/init.d/sshd restart
else
echo "Unknown distribution"
exit 1
fi
# Set PermitRootLogin to prohibit-password
sed -i 's/^#*PermitRootLogin.*/PermitRootLogin prohibit-password/' /etc/ssh/sshd_config
# Generate SSH keys (doesn't overwrite if keys already exists)
ssh-keygen -q -t rsa -N '' <<< $'\nn' >/dev/null 2>&1

View File

@ -1,5 +0,0 @@
cat <<EOF >/etc/apk/repositories
https://dl-cdn.alpinelinux.org/alpine/edge/main
https://dl-cdn.alpinelinux.org/alpine/edge/community
https://dl-cdn.alpinelinux.org/alpine/edge/testing
EOF

View File

@ -1,2 +0,0 @@
fabric~=3.1.0
PyYAML~=6.0

View File

@ -0,0 +1,33 @@
{
"lxc_id": "0000",
"lxc_hostname": "test",
"os": {
"name": "alpine",
"release": "3.17"
},
"resources": {
"cpu": "1",
"memory": "512",
"disk": "10",
"storage": "local-lvm"
},
"network": {
"bridge": "vmbr0",
"ipv4": "dhcp",
"ipv6": "",
"mac": "00:00:00:00:00:00",
"gateway": "",
"vlan": ""
},
"options": {
"privileged": "false",
"start_on_boot": "false",
"password": "qwertz1234",
"ssh": false
},
"creation": {
"type": "tteck",
"script": "https://git.broillet.ch/Mathieu/ProxmoxHelperScripts/src/branch/main/install/alpine-vaultwarden-install.sh"
},
"deploy": []
}

View File

23
run.py
View File

@ -1,26 +1,7 @@
import argparse
import logging
from src import main
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Deploy JSON file as LXCs and VMs to your proxmox server")
parser.add_argument("-v", "--verbose", help="increase output verbosity", action="store_true")
parser.add_argument("--host", help="host to use for ssh", type=str)
parser.add_argument("--username", help="username to use for ssh (default: root)", type=str, default="root")
parser.add_argument("--port", help="port to use for ssh (default: 22)", type=str, default=22)
parser.add_argument("--repo", help="git repo to clone", type=str)
parser.add_argument("path", help="path where your repo is cloned or will be cloned", type=str)
args = parser.parse_args()
# Set logging level
if args.verbose:
logging.basicConfig(format='[%(levelname)s] : %(message)s', level=logging.DEBUG)
else:
logging.basicConfig(format='[%(levelname)s] : %(message)s', level=logging.INFO)
# Run the main program
main.run(args)
logging.getLogger().setLevel(logging.INFO)
main.run()

View File

@ -1,23 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from ..utils import steps_utils
if TYPE_CHECKING:
from .lxc import LXC
def run_steps(lxc: LXC):
"""Run the creation steps for the given LXC
Parameters
----------
lxc : LXC
The LXC object used to run the creation steps
"""
creation_steps = lxc.creation["steps"]
# Run the steps
steps_utils.run_steps(lxc, creation_steps)

View File

@ -1,248 +0,0 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from . import creation_utils, lxc_utils
from ..machine.machine import LinuxMachine
from ..utils import conditions_utils
if TYPE_CHECKING:
from ..proxmox.proxmox import ProxmoxHost
class LXC(LinuxMachine):
"""LXC object"""
def __init__(self, id: int, hostname: str, os: dict, resources: dict, network: dict, options: dict,
creation: dict, deploy: dict, features: dict, proxmox_host: ProxmoxHost):
super().__init__()
self.id = id
self.hostname = hostname
self.os = os
self.os_name = os.get("name")
self.os_release = os.get("release")
self.resources = resources
self.cpu = resources.get("cpu")
self.memory = resources.get("memory")
self.swap = resources.get("swap")
self.disk = resources.get("disk")
self.storage = resources.get("storage")
self.network = network
self.bridge = network.get("bridge")
self.ipv4 = network.get("ip")
self.ipv4_netmask = 0
self.ipv6 = network.get("ip6")
self.mac = network.get("hwaddr")
self.gateway4 = network.get("gw")
self.gateway6 = network.get("gw6")
self.vlan = network.get("trunks")
self.options = options
self.privileged = options.get("privileged")
self.start_on_boot = options.get("start_on_boot")
self.startup_order = options.get("startup_order")
self.password = options.get("password")
self.creation = creation
self.deploy = deploy
self.features = features
self.pve = proxmox_host
def __str__(self):
return f"LXC {self.id} ({self.hostname})"
def __repr__(self):
return f"LXC {self.id} ({self.hostname})"
def __eq__(self, other):
return self.id == other.id
def __hash__(self):
return hash(self.id)
def get_os_template(self):
"""
Get OS template
:return: os template
"""
# TODO: might have to run "pveam update" before running this command on fresh install of PVE
template_name = self.pve.run_command(
f"pveam available --section system | awk /\'{self.os_name}-{self.os_release}/\' | awk \'{{print $2}}\'")
is_template_downloaded = self.pve.run_command(command=f"pveam list local | awk /'{template_name}/'")
if is_template_downloaded == "":
logging.info(f"Template {template_name} not found, downloading it...")
self.pve.run_command(command=f"pveam download local {template_name}")
return f"local:vztmpl/{template_name}"
def retrieve_ipv4(self, netmask: bool = False, use_ssh: bool = False):
"""
Get IPv4
:return: ipv4
"""
if self.ipv4 == "dhcp" or self.ipv4 == "auto":
if self.is_running():
if self.has_program("ip", use_ssh=use_ssh):
self.ipv4_netmask = self.run_command(
"""ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | cut -f1 """,
use_ssh=use_ssh)
self.ipv4 = str(self.ipv4_netmask).split("/")[0]
else:
return self.ipv4
if netmask:
return self.ipv4_netmask
else:
return self.ipv4
def retrieve_ipv6(self):
"""
Get IPv6
:return: ipv6
"""
return self.ipv6
def get_ssh_string(self):
# TODO: implement hostname and maybe ipv6?
return "root@" + self.retrieve_ipv4()
def is_running(self):
"""
Is running
:return: is lxc running? (boolean)
"""
return self.pve.is_lxc_running(self.id)
def does_exist(self):
"""
Does exist
:return: does lxc exist? (boolean)
"""
return self.pve.does_lxc_exist(self.id)
def start(self):
"""
Start LXC
"""
self.pve.start_lxc(self.id)
def stop(self):
"""
Stop LXC
"""
self.pve.stop_lxc(self.id)
def reboot(self):
"""
Reboot LXC
"""
logging.info(f"Rebooting LXC {self.id}")
self.pve.reboot_lxc(self.id)
def create(self):
"""
Create LXC
"""
if self.does_exist():
logging.info(f"LXC {self.id} already exists, skipping creation")
self.pve.run_command(command=lxc_utils.generate_pct_command_for_lxc(self, create=False))
else:
logging.info(f"Creating LXC {self.id}")
self.pve.run_command(command=lxc_utils.generate_pct_command_for_lxc(self, create=True))
self.start()
# Make sure bash is installed for later use
if not self.has_program("bash", use_ssh=False):
self.install_package("bash", use_ssh=False)
# Install and configure OpenSSH on LXC
logging.info("Setting up SSH for LXC")
lxc_utils.run_protected_script(lxc=self, script_path="protected/scripts/install-config-ssh.sh")
self.pve.run_command(f"ssh-keygen -f '/root/.ssh/known_hosts' -R {self.retrieve_ipv4(use_ssh=False)}")
# Add main, community and testing repo for Alpine
# if "alpine" in self.os_name:
# logging.info("Setting up Alpine repositories for LXC")
# lxc_utils.run_protected_script(lxc=self, script_path="protected/scripts/setup-repo-alpine.sh")
# self.run_command("apk update", use_ssh=False)
def run_creation(self):
"""
Run the creations checks and steps
"""
# Check if LXC is running
if not self.is_running():
self.start()
logging.info(f"Running creation checks for LXC {self.id}")
# Check if all creation conditions are met
if not conditions_utils.are_all_conditions_met(self):
logging.info(f"Not all creation conditions met for LXC {self.id}, running creation steps...")
# Run creation steps
logging.info(f"Running creation steps for LXC {self.id}")
creation_utils.run_steps(self)
else:
logging.info(f"All creation conditions met for LXC {self.id}, skipping creation steps...")
def deploy(self):
pass
def run_script(self, script_path, use_ssh: bool = True):
"""
Run script on LXC filesystem using bash
:param script_path:
:return:
"""
return self.run_command(command=f"export TERM=linux && bash {script_path}", use_ssh=use_ssh)
def run_command(self, command: str, return_status_code: bool = False,
exception_on_exit: bool = False,
exception_on_empty_stdout: bool = False,
working_directory: str = None, use_ssh: bool = True):
"""
Run command on LXC
:param command: command to run
:return: command output
return_status_code: bool = False,
exception_on_exit: bool = False,
exception_on_empty_stdout: bool = False,
working_directory: str = None
"""
# logging.debug(f"Running command {command} on LXC {self.lxc_id}")
if type(command) == list:
command = ' && '.join(command)
if working_directory:
command = f"cd {working_directory} && {command}"
# Using pct exec works every time but is 8x slower than using ssh
if use_ssh:
return self.pve.run_command(
command=f"ssh -o StrictHostKeyChecking=no root@{self.retrieve_ipv4()} -- \"{command}\"",
return_status_code=return_status_code,
exception_on_exit=exception_on_exit,
exception_on_empty_stdout=exception_on_empty_stdout)
else:
return self.pve.run_command(command=f"pct exec {self.id} -- {command}",
return_status_code=return_status_code,
exception_on_exit=exception_on_exit,
exception_on_empty_stdout=exception_on_empty_stdout)

View File

@ -1,233 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import yaml
from .lxc import LXC
from ..proxmox.proxmox import ProxmoxHost
from ..utils import utils
from ..utils.resources_utils import get_path
if TYPE_CHECKING:
from ..proxmox.proxmox import ProxmoxHost
lxcs = []
def get_all_lxcs():
return lxcs
def load_lxc(lxc_id: int, content: str or bytes, pve: ProxmoxHost):
"""Load LXC from JSON file
Parameters
----------
lxc_id : int
ID of the LXC to load
content : str or bytes
Content of the JSON file
pve : ProxmoxHost
Proxmox host where the LXC is/will go on
Examples
--------
>>> load_lxc(100, '{ <json-data> }')
"""
if isinstance(content, bytes):
content = content.decode("utf-8")
# Load JSON data
# data = json.loads(content)
data = yaml.safe_load(content)
# Extract values from JSON
hostname = data.get("lxc_hostname")
os = data.get("os")
resources = data.get("resources")
network = data.get("network")
options = data.get("options")
creation = data.get("creation")
deploy = data.get("deploy")
features = data.get("features")
# Create LXC object
lxc = LXC(lxc_id, hostname, os, resources, network, options, creation, deploy, features, pve)
lxcs.append(lxc)
# def get_tteck_env_variables(self):
# """
# Get TTECK environment variables to run scripts silently
# ! Deprecated for now !
#
# :return: environment variables
# """
#
# env_variables = {
# "CT_TYPE": "1",
# "PW": self.password,
# "CT_ID": self.id,
# "HN": self.hostname,
# "DISK_SIZE": self.disk,
# "CORE_COUNT": self.cpu,
# "RAM_SIZE": self.memory,
# "BRG": self.bridge,
# "NET": self.ipv4,
# "GATE": self.gateway4,
# "DISABLEIP6": "no",
# "MTU": "",
# "SD": "",
# "NS": "",
# "MAC": self.mac,
# "VLAN": self.vlan,
# # "SSH": self.ssh,
# "VERB": "no"
# }
#
# env_command = " && ".join([f"export {name}=\"{value}\"" for name, value in env_variables.items()])
# return env_command
def generate_pct_command_for_lxc(lxc: LXC, create: bool = True):
"""
Get pct command to create/edit LXC
:return: pct command
"""
# Common parameters for both create and update commands
common_params = [
f"--hostname {lxc.hostname}",
f"--cores {lxc.cpu}",
f"--memory {lxc.memory}",
f"--swap {lxc.swap}",
f"--onboot {int(lxc.start_on_boot)}",
f"--ostype {lxc.os_name}",
]
# Check and include specific net0 parameters
net0_params = ["name=eth0"]
if lxc.network.get("bridge") and lxc.network.get("bridge") != "":
net0_params.append(f"bridge={lxc.network['bridge']}")
if lxc.network.get("firewall" and lxc.network.get("firewall") != ""):
net0_params.append(f"firewall={lxc.network['firewall']}")
if lxc.network.get("gw") and lxc.network.get("gw") != "":
net0_params.append(f"gw={lxc.network['gw']}")
if lxc.network.get("gw6") and lxc.network.get("gw6") != "":
net0_params.append(f"gw6={lxc.network['gw6']}")
if lxc.network.get("hwaddr") and lxc.network.get("hwaddr") != "":
net0_params.append(f"hwaddr={lxc.network['hwaddr']}")
if lxc.network.get("ip") and lxc.network.get("ip") != "":
net0_params.append(f"ip={lxc.retrieve_ipv4(netmask=True)}")
if lxc.network.get("ip6") and lxc.network.get("ip6") != "":
net0_params.append(f"ip6={lxc.retrieve_ipv6()}")
if lxc.network.get("link_down") and lxc.network.get("link_down") != "":
net0_params.append(f"link_down={lxc.network['link_down']}")
if lxc.network.get("mtu") and lxc.network.get("mtu") != "":
net0_params.append(f"mtu={lxc.network['mtu']}")
if lxc.network.get("rate") and lxc.network.get("rate") != "":
net0_params.append(f"rate={lxc.network['rate']}")
if lxc.network.get("tag") and lxc.network.get("tag") != "":
net0_params.append(f"tag={lxc.network['tag']}")
if lxc.network.get("trunks") and lxc.network.get("trunks") != "":
net0_params.append(f"trunks={lxc.network['trunks']}")
if lxc.network.get("type") and lxc.network.get("type") != "":
net0_params.append(f"type={lxc.network['type']}")
if net0_params:
common_params.append(f"--net0 {','.join(net0_params)}")
if create:
# Additional parameters for create command
create_params = [
f"--password {lxc.password}",
f"--storage {lxc.storage}",
f"--unprivileged {not lxc.privileged}",
f"--rootfs volume={lxc.storage}:{lxc.disk},size={lxc.disk}",
"--ssh-public-keys /root/.ssh/id_rsa.pub",
]
# Check and include specific features based on their values
features_params = []
if lxc.features is not None and type(lxc.features) is dict:
if lxc.features.get("force_rw_sys") and lxc.features.get("force_rw_sys") != "":
features_params.append(f"force_rw_sys={lxc.features['force_rw_sys']}")
if lxc.features.get("fuse") and lxc.features.get("fuse") != "":
features_params.append(f"fuse={lxc.features['fuse']}")
if lxc.features.get("keyctl") and lxc.features.get("keyctl") != "":
features_params.append(f"keyctl={lxc.features['keyctl']}")
if lxc.features.get("mknod") and lxc.features.get("mknod") != "":
features_params.append(f"mknod={lxc.features['mknod']}")
if lxc.features.get("mount") and lxc.features.get("mount") != "":
features_params.append(f"mount={';'.join(lxc.features['mount'])}")
if lxc.features.get("nesting") and lxc.features.get("nesting") != "":
features_params.append(f"nesting={lxc.features['nesting']}")
if features_params:
create_params.append(f"--features {','.join(features_params)}")
# Combine common and create-specific parameters
command_params = common_params + create_params
# Create command
pct_command = f"pct create {lxc.id} {lxc.get_os_template()} {' '.join(command_params)}"
else:
# Update command
# Combine common parameters only
command_params = common_params
pct_command = f"pct set {lxc.id} {' '.join(command_params)}"
return pct_command
def run_script_step_parser(lxc: LXC, step: dict):
# Install bash if not installed
# Sometimes only ash or sh are installed, which doesn't work for some scripts
if not lxc.has_program("bash"):
lxc.install_package("bash")
# Run local script
if "path" in step:
if "protected/" in step["path"]:
run_protected_script(lxc, step["path"])
else:
run_repo_script(lxc, step["path"])
# Run remote script
elif "url" in step:
run_remote_script(lxc, step["url"])
# Run script in LXC
elif "lxc_path" in step:
lxc.run_script(step["lxc_path"])
def run_repo_script(lxc: LXC, script_path: str):
# Run local script
script_path = get_path(lxc, script_path)
lxc.pve.copy_file_to_lxc(lxc, script_path, f"/tmp/pdj-temp/{script_path.name}")
lxc.run_script(f"/tmp/pdj-temp/{script_path.name}")
lxc.delete_file(f"/tmp/pdj-temp/{script_path.name}")
def run_protected_script(lxc: LXC, script_path: str):
script_path = get_path(lxc, script_path)
utils.copy_local_file_to_pve(lxc.pve, script_path, f"/tmp/pdj-temp/{script_path.name}")
lxc.pve.copy_file_to_lxc(lxc, f"/tmp/pdj-temp/{script_path.name}", f"/tmp/pdj-temp/{script_path.name}",
use_ssh=False)
lxc.run_script(f"/tmp/pdj-temp/{script_path.name}", use_ssh=False)
lxc.delete_file(f"/tmp/pdj-temp/{script_path.name}", use_ssh=False)
lxc.pve.delete_file(f"/tmp/pdj-temp/{script_path.name}")
def run_remote_script(lxc: LXC, url: str):
# Install curl if not installed
if not lxc.has_program("curl"):
lxc.install_package("curl")
# Run remote script
lxc.run_command(f"curl -sSL {url} | bash")

View File

@ -1,364 +0,0 @@
from pathlib import Path, PurePosixPath
from . import machine_utils
class LinuxMachine:
def __init__(self):
self.known_programs = []
self.id = None
pass
def update(self):
pass
def get_hostname(self):
return self.run_command("hostname")
def get_uptime(self):
return self.run_command("uptime -p")
def get_os_name(self):
"""Get OS name
Returns
-------
str
OS name
"""
if hasattr(self, "os_name"):
return self.os_name
return self.run_command("""cat /etc/os-release | grep -E '^NAME=' | cut -d '=' -f 2 | tr -d '"'""")
def get_memory(self):
"""Get memory"""
return self.run_command("free -m | grep Mem | awk '{print $2}'")
def retrieve_ipv4(self):
"""Get IPv4 address"""
if self.has_program("ip"):
return self.run_command("""ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | cut -f1 -d'/'""")
elif self.has_program("ifconfig"):
return self.run_command(command="ifconfig eth0 | awk '/inet addr/{print substr($2,6)}'")
def retrieve_ipv6(self):
pass
def get_mac(self):
"""Get MAC address"""
return self.run_command("""cat /sys/class/net/$(ip route show default | awk '/default/ {print $5}')/address""")
def start(self):
pass
def stop(self):
pass
def reboot(self):
pass
def has_program(self, program: str, use_ssh: bool = True):
"""Check if program is installed on LXC
:param program: program executable name
:return: boolean
"""
if program in self.known_programs:
return True
if type(program) == str:
result = self.run_command("which " + program, return_status_code=True, use_ssh=use_ssh) == 0
if result:
self.known_programs.append(program)
return result
elif type(program) == list:
return all(self.has_program(program=p, use_ssh=use_ssh) for p in program)
def has_file(self, file: str or Path):
"""Check if file exists on LXC
:param file: file or path
:return: boolean
"""
if isinstance(file, Path):
file = str(file.as_posix())
return self.run_command("test -f " + file, return_status_code=True) == 0
def has_directory(self, directory: str or Path):
"""Check if directory exists on LXC
:param directory: directory path
:return: boolean
"""
if isinstance(directory, Path):
directory = str(directory.as_posix())
return self.run_command("test -d " + directory, return_status_code=True) == 0
def run_command(self, command, **kwargs):
pass
def run_script(self, script: str or Path):
return self.run_command(f"bash {script}", return_status_code=True)
def list_dir(self, directory: str or Path):
pass
def create_file(self, file: str or Path, permission: int = 644, owner: str = "root"):
"""Create file"""
if isinstance(file, Path):
file = str(file.as_posix())
self.run_command(f"touch {file}", return_status_code=True)
if permission != 644:
self.run_command(f"chmod {permission} {file}", return_status_code=True)
if owner != "root":
self.run_command(f"chown {owner} {file}", return_status_code=True)
def create_directory(self, directory: str or Path, permission: int = 755, owner: str = "root",
use_ssh: bool = True):
"""Create directory"""
if isinstance(directory, Path):
directory = str(directory.as_posix())
self.run_command(f"mkdir -p {directory}", return_status_code=True, use_ssh=use_ssh)
if permission != 755:
self.run_command(f"chmod -R {permission} {directory}", return_status_code=True, use_ssh=use_ssh)
if owner != "root":
self.run_command(f"chown -R {owner} {directory}", return_status_code=True)
def delete_file(self, file: str or Path, use_ssh: bool = True):
"""Delete file"""
if isinstance(file, Path):
file = str(file.as_posix())
self.run_command(f"rm {file}", return_status_code=True, use_ssh=use_ssh)
def delete_directory(self, directory: str or Path):
"""Delete directory"""
if isinstance(directory, Path):
directory = str(directory.as_posix())
self.run_command(f"rm -rf {directory}", return_status_code=True)
def run_docker_command(self, container, command: str or list):
"""Run a command inside a docker container on a linux host
Parameters
----------
container : str
Name of the docker container to run the command in
command : str
Command to run in the docker container
Examples
--------
>>> self.run_docker_command(linux_machine, "<container-name>", "<command>")
"""
if type(command) == list:
command = " && ".join(command)
if not self.has_program("docker"):
raise Exception(f"Docker is not installed on this machine {self.get_hostname()}")
self.run_command(f"docker exec -it {container} {command}", exception_on_empty_stdout=False)
def run_docker_compose_command(self, command: str, working_directory: str = None):
"""Run a docker-compose command on a linux host
Parameters
----------
command: str
The docker-compose command to run
working_directory: str, optional
The working directory to run the command in
Examples
--------
>>> self.run_docker_compose_command(linux_machine, "up -d", "/home/user/traefik")
"""
docker_compose_exec = "docker-compose"
if not self.has_program(docker_compose_exec):
docker_compose_exec = "docker compose"
if working_directory is not None:
self.run_command(f"cd {working_directory} && {docker_compose_exec} {command}", return_status_code=True)
else:
self.run_command(f"{docker_compose_exec} {command}", return_status_code=True)
def download_file(self, url: str or list, destination: str):
"""Download a file from a URL to the Linux Machine and save it to the destination
Parameters
----------
url: str or list
URL of the file to download
destination: str
Path to the destination to save the file to
Needs to end with a trailing slash
Examples
--------
>>> self.download_file("https://example.com/file.zip", "/home/user/")
"""
if type(url) is list:
for u in url:
self.download_file(u, destination)
else:
self.run_command(f"wget {url} --directory-prefix={destination}", return_status_code=True)
def unzip_file(self, path: str, destination: str = None):
"""Unzip a file
Parameters
----------
path: str
Path to the file to unzip
destination: str, optional
Path to the destination to unzip the file to
If not specified, it will unzip the file in the same directory as the file
Needs to end with a trailing slash
Examples
--------
>>> self.unzip_file("/home/user/file.zip", "/home/user/extracted_files/")
>>> self.unzip_file("/home/user/file.tar.gz", "/home/user/extracted_files/")
"""
if destination is None:
destination = PurePosixPath(path).parent
if ".zip" in path:
self.run_command(f"unzip {path} -d {destination}", return_status_code=True)
elif ".tar.gz" in path:
self.run_command(f"mkdir -p {destination} && tar -xzf {path} --directory {destination}",
return_status_code=True)
def install_package(self, package: str or list, use_ssh: bool = True):
"""Install a package in the Linux Machine
Parameters
----------
package: str or list
Name(s) of the package(s) to install
Examples
--------
>>> self.install_package("nginx")
>>> self.install_package(["nginx", "apache2"])
"""
if type(package) is list:
packages = []
for p in package:
packages.append(p)
self.run_command(
f"{machine_utils.get_install_package_command(self.get_os_name())} {' '.join(packages)}",
return_status_code=True)
else:
self.run_command(f"{machine_utils.get_install_package_command(self.get_os_name())} {package}",
return_status_code=True, use_ssh=use_ssh)
def remove_package(self, package: str or list):
"""Remove a package in the Linux Machine
Parameters
----------
package: str or list
Name(s) of the package(s) to remove
Examples
--------
>>> self.remove_package("nginx")
>>> self.remove_package(["nginx", "apache2"])
"""
if type(package) is list:
packages = []
for p in package:
packages.append(p)
self.run_command(
f"{machine_utils.get_remove_package_command(self.get_os_name())} {' '.join(packages)}",
return_status_code=True)
else:
self.run_command(f"{machine_utils.get_remove_package_command(self.get_os_name())} {package}",
return_status_code=True)
def replace_in_files(self, path: str or list, search: str, replace: str,
case_sensitive: bool = False):
"""Replace a string in one or multiples files in a LinuxMachine
Parameters
----------
path : str or list of str
Path to the file(s) to replace the string in
search : str
String to search for
replace : str
String to replace the search string with
case_sensitive : bool, optional
Whether the search should be case sensitive or not
Examples
--------
>>> self.replace_in_files("/home/user/file.txt", "username=root", "username=administrator"
>>> self.replace_in_files(["/home/user/file1.txt", "/home/user/file2.txt"], \
"username=root", "username=administrator", case_sensitive=True)
"""
if type(path) is list:
for p in path:
self.run_command(f"sed {'-i' if case_sensitive else ''} 's/{search}/{replace}/g' {p}",
return_status_code=True)
else:
self.run_command(f"sed {'-i' if case_sensitive else ''} 's/{search}/{replace}/g' {path}",
return_status_code=True)
def start_service(self, service: str):
self.run_command(machine_utils.get_services_command(self.get_os_name(), "start", service),
return_status_code=True)
def stop_service(self, service: str):
self.run_command(machine_utils.get_services_command(self.get_os_name(), "stop", service),
return_status_code=True)
def restart_service(self, service: str):
self.run_command(machine_utils.get_services_command(self.get_os_name(), "restart", service),
return_status_code=True)
def enable_service(self, service: str):
self.run_command(machine_utils.get_services_command(self.get_os_name(), "enable", service),
return_status_code=True)
def disable_service(self, service: str):
self.run_command(machine_utils.get_services_command(self.get_os_name(), "disable", service),
return_status_code=True)
def is_folder(self, path: str) -> bool:
self.run_command(f"test -d {path}", return_status_code=True)
def move(self, source, destination, permission: int = None, owner: str = "root"):
self.run_command(f"mv {source} {destination}", return_status_code=True)
if permission is not None:
if self.is_folder(source):
self.run_command(f"chmod -R {permission} {destination}", return_status_code=True)
else:
self.run_command(f"chmod {permission} {destination}", return_status_code=True)
if owner != "root":
if self.is_folder(source):
self.run_command(f"chown -R {owner} {destination}", return_status_code=True)
else:
self.run_command(f"chown {owner} {destination}", return_status_code=True)

View File

@ -1,145 +0,0 @@
def get_install_package_command(distribution: str):
"""Retrieve the correct command to install a package based on the distribution specified
It supports all the distribution supported by Proxmox VE (from the pct command documentation).
Debian, Ubuntu, CentOS, Fedora, Gentoo, Alpine, ArchLinux, Devuan, NixOS, OpenSUSE
Parameters
----------
distribution: str
Name of the distribution as specific in the proxmox pct command documentation
See Also
--------
https://pve.proxmox.com/pve-docs/pct.1.html
Returns
-------
str
Beginning of the command to install a package, the package name should be appended to it
"""
distribution = distribution.lower()
if "debian" in distribution:
return "apt-get install -y"
elif "ubuntu" in distribution:
return "apt-get install -y"
elif "centos" in distribution:
return "yum install -y"
elif "fedora" in distribution:
return "yum install -y"
elif "gentoo" in distribution:
return "emerge -a"
elif "alpine" in distribution:
return "apk add"
elif "archlinux" in distribution:
return "pacman -S --noconfirm"
elif "devuan" in distribution:
return "apt-get install -y"
elif "nixos" in distribution:
return "nix-env -i"
elif "opensuse" in distribution:
return "zypper install -y"
else:
raise Exception(f"Unsupported distribution: {distribution}")
def get_remove_package_command(distribution: str):
"""Retrieve the correct command to uninstall a package based on the distribution specified
It supports all the distribution supported by Proxmox VE (from the pct command documentation).
Debian, Ubuntu, CentOS, Fedora, Gentoo, Alpine, ArchLinux, Devuan, NixOS, OpenSUSE
Parameters
----------
distribution: str
Name of the distribution as specific in the proxmox pct command documentation
See Also
--------
https://pve.proxmox.com/pve-docs/pct.1.html
Returns
-------
str
Beginning of the command to uninstall a package, the package name should be appended to it
"""
distribution = distribution.lower()
if "debian" in distribution or "ubuntu" in distribution or "devuan" in distribution:
return "apt-get remove -y"
elif "centos" in distribution or "fedora" in distribution:
return "yum remove -y"
elif "gentoo" in distribution:
return "emerge -C"
elif "alpine" in distribution:
return "apk del"
elif "archlinux" in distribution:
return "pacman -R --noconfirm"
elif "nixos" in distribution:
return "nix-env -e"
elif "opensuse" in distribution:
return "zypper remove -y"
else:
raise Exception(f"Unsupported distribution: {distribution}")
def get_services_command(distribution, operation: str, service: str):
"""Retrieve the correct command to start/stop/restart a service based on the distribution specified
It supports all the distribution supported by Proxmox VE (from the pct command documentation).
Parameters
----------
distribution: str
Name of the distribution as specific in the proxmox pct command documentation
operation: str
Operation to perform on the service, it can be start, stop or restart
service: str
Name of the service to start/stop/restart
See Also
--------
https://pve.proxmox.com/pve-docs/pct.1.html
Returns
-------
str
Command to start/stop/restart the service
"""
distribution = distribution.lower()
operation = operation.lower()
if "debian" in distribution or \
"ubuntu" in distribution or \
"devuan" in distribution or \
"centos" in distribution or \
"fedora" in distribution or \
"archlinux" in distribution or \
"opensuse" in distribution or \
"nixos" in distribution:
if operation == "start":
return f"systemctl start {service}"
elif operation == "stop":
return f"systemctl stop {service}"
elif operation == "restart":
return f"systemctl restart {service}"
elif operation == "enable":
return f"systemctl enable {service}"
elif operation == "disable":
return f"systemctl disable {service}"
else:
raise Exception(f"Unsupported operation: {operation}")
elif "gentoo" in distribution or "alpine" in distribution:
if operation == "start":
return f"rc-service {service} start"
elif operation == "stop":
return f"rc-service {service} stop"
elif operation == "restart":
return f"rc-service {service} restart"
elif operation == "enable":
return f"rc-update add {service} default"
elif operation == "disable":
return f"rc-update del {service} default"
else:
raise Exception(f"Unsupported operation: {operation}")

View File

@ -1,56 +1,32 @@
import logging
import threading
from pathlib import Path
import os
from .lxc.lxc_utils import load_lxc, get_all_lxcs
from .proxmox.proxmox import ProxmoxHost
from .utils import git_utils
from src.get_path_file import project_path
from src.utils.lxc_utils import load_lxc, get_all_lxcs
def run(args):
# Check if running local or SSH
if args.host is None:
pve = ProxmoxHost(path=Path(args.path))
else:
pve = ProxmoxHost(host=args.host, user=args.username, port=args.port, path=Path(args.path))
pve.connect()
if args.repo is None:
logging.warning("No repo provided, skipping updates/cloning...")
else:
# Check if the repo is already cloned
if not pve.has_directory(pve.repo_path):
logging.info(f"Cloning repository {args.repo} to {args.path}...")
git_utils.clone_repo(linux_machine=pve, url=args.repo, path=pve.repo_path)
if pve.has_directory(Path(pve.repo_path).joinpath(".git")):
logging.info(f"Repository already cloned at {pve.repo_path}, updating...")
git_utils.update_repo(linux_machine=pve, path=pve.repo_path)
if not pve.has_directory(pve.repo_path):
raise FileNotFoundError(f"Repo not found at {pve.repo_path}")
def run():
# Read all files in the resources directory
resources = pve.list_dir(args.path)
# resources = Path(project_path).joinpath("resources").glob("*")
resources = os.listdir(project_path / "resources")
logging.info(f"Resources found: {resources}")
# Go through each LXC file
for resource in resources:
if resource == "lxc":
logging.info("LXC folder found")
# Read all files in the LXC directory
lxc_folders = pve.list_dir(Path(args.path).joinpath(resource))
lxc_folders = os.listdir(project_path / "resources" / "lxc")
for lxc_folder in lxc_folders:
lxc_file_content = pve.read_file(Path(args.path).joinpath(resource, lxc_folder, "config.yml"))
lxc_file = os.path.join(project_path / "resources" / "lxc", lxc_folder, "config.json")
logging.info(f"Reading LXC ID {lxc_file}")
load_lxc(content=lxc_file_content, lxc_id=int(lxc_folder), pve=pve)
# Open the file
with open(lxc_file, "r") as file:
# Load the LXC
load_lxc(file.read())
for lxc in get_all_lxcs():
threading.Thread(target=launch_lxc, args=(lxc,)).start()
print(get_all_lxcs())
def launch_lxc(lxc):
logging.info(f"Loading LXC {lxc.id}")
lxc.create()
lxc.start()
lxc.run_creation()
logging.info(f"Finished loading LXC {lxc.id}")
# print(get_all_lxcs()[0].get_tteck_env_variables())
# get_all_lxcs()[0].create()

View File

@ -1,307 +0,0 @@
from __future__ import annotations
import fnmatch
import subprocess
import threading
import time
from pathlib import Path
from typing import TYPE_CHECKING
from fabric import Connection
from ..machine.machine import LinuxMachine
if TYPE_CHECKING:
from ..lxc.lxc import LXC
class ProxmoxHost(LinuxMachine):
"""Class to represent a Proxmox host."""
def __init__(self, host: str = None, user: str = "root", port: int = 22, path: Path = None):
"""Initialize a Proxmox host.
If no host is provided, it will use the local machine as the Proxmox host.
Parameters
----------
host: str, optional
hostname or IP address of the Proxmox host
user: str, optional
username to use for SSH connection (default: root)
port: int, optional
port to use for SSH connection (default: 22)
"""
super().__init__()
self.host = host
self.user = user
self.port = port
self.connection = None
self.repo_path = path.as_posix()
self.running_lxcs = []
threading.Thread(target=self.update, daemon=True).start()
def update(self):
update_connection = Connection(host=self.host, user=self.user, port=self.port)
while True:
pct_output = update_connection.run("pct list", hide=True, warn=True, encoding="utf-8").stdout.split("\n")[
1:]
for line in pct_output:
if line.strip():
if len(line.split()) == 3:
vm_id, status, name = line.split()
else:
vm_id, status, lock, name = line.split()
if "running" in status:
if vm_id not in self.running_lxcs:
self.running_lxcs.append(int(vm_id))
else:
if vm_id in self.running_lxcs:
self.running_lxcs.remove(int(vm_id))
time.sleep(0.5)
def __str__(self):
return f"ProxmoxHost({self.host}, {self.user}, {self.port})"
def __repr__(self):
return f"ProxmoxHost({self.host}, {self.user}, {self.port})"
def connect(self):
"""Connect to the Proxmox host."""
if self.host is not None:
self.connection = Connection(host=self.host, user=self.user, port=self.port)
def run_command(self, command: str, return_status_code: bool = False, exception_on_exit: bool = True,
exception_on_empty_stdout: bool = False, working_directory=None, **kwargs):
"""Run a command on the Proxmox VE host
The default behavior is as follows :
- runs the command on the Proxmox VE host
- throws an exception if the exit status is not 0
- returns the stdout of the command
-- if the stdout is None or empty, throws an exception
If you don't except an output or depend on the status code returned by the command, you can set return_status_code to True:
- runs the command on the Proxmox VE host
- returns the status code of the command
Parameters
----------
command: str
command to run
return_status_code: bool, optional
should it return the exit code and not the stdout, disables exception_on_exit
exception_on_exit: bool, optional
should an exception be thrown if the exit status code is not 0
even though it's True by default, if you use return_status_code, it is disabled
exception_on_empty_stdout: bool, optional
should an exception be thrown if the stdout is None or empty
shell: bool, optional
should the command be run in a shell or not, see python subprocess documentation for more informations
Returns
-------
str
command output by default
int
command exit code if return_status_code is True
Raises
------
Exception
if the exit status is not 0 and exception_on_exit is True
Exception
if the stdout is None and return_status_code is False
Exception
if the stdout is empty and exception_on_empty_stdout is True
"""
if type(command) == list:
command = ' && '.join(command)
if working_directory:
command = f"cd {working_directory} && {command}"
start_time = time.time()
# Check if host is None, if it is, run the command locally, else run it on the host via SSH
if self.host is None:
command_result = subprocess.run(command, shell=True, capture_output=True, encoding="utf-8")
elif self.connection is not None:
command_result = self.connection.run(command, hide=True, warn=True, encoding="utf-8")
else:
raise Exception("No host or connection provided")
end_time = time.time()
print(f"Command {command} took {end_time - start_time} seconds to run")
# If return code is not 0 and that exception_on_exit is True and return_status_code is False, throw an exception
if command_result.return_code != 0 and exception_on_exit and not return_status_code:
raise Exception(f"Error while running command: \n{command_result.stderr}")
if return_status_code:
return command_result.return_code
# Check if stdout is empty, throw an exception or return empty string depending on exception_on_empty_stdout
if (command_result.stdout is None or command_result.stdout == "") and exception_on_empty_stdout:
raise Exception(
f"Error, no output from command, try using the command with return_status_code instead: \n{command_result.stderr}")
elif command_result.stdout is None or command_result.stdout == "":
return ""
# Decode stdout if it's bytes
if type(command_result.stdout) == bytes:
return command_result.stdout.decode().rstrip()
return command_result.stdout.rstrip()
def get_version(self):
"""Get the version of the Proxmox host."""
return self.run_command("pveversion")
def get_all_lxcs(self):
"""Get all the LXCs on the Proxmox host."""
pct_list_output = self.run_command("pct list", exception_on_exit=False)
pct_list_output = pct_list_output.split("\n")[1:]
ids = [int(line.split()[0]) for line in pct_list_output]
if not ids:
return []
return ids
def does_lxc_exist(self, lxc_id):
"""Check if the given LXC exists on the Proxmox host."""
return lxc_id in self.get_all_lxcs()
def is_lxc_running(self, lxc_id):
"""Check if the given LXC is running on the Proxmox host."""
if lxc_id in self.running_lxcs:
return True
else:
return False
# Not used anymore because it's too slow, now using an update thread to keep track of running LXCs
# pct_status_output = self.run_command(f"pct status {lxc_id}", exception_on_exit=False)
# return "running" in pct_status_output
def start_lxc(self, lxc_id):
"""Start the given LXC on the Proxmox host."""
if not self.is_lxc_running(lxc_id):
self.run_command(f"pct start {lxc_id}")
def stop_lxc(self, lxc_id):
"""Stop the given LXC on the Proxmox host."""
if self.is_lxc_running(lxc_id):
self.run_command(f"pct stop {lxc_id}")
def reboot_lxc(self, lxc_id):
"""Reboot the given LXC on the Proxmox host."""
self.run_command(f"pct reboot {lxc_id}")
def get_all_vms(self):
"""Get all the VMs on the Proxmox host."""
qm_list_output = self.run_command("qm list")
qm_list_output = qm_list_output.split("\n")[1:]
ids = [line.split()[0] for line in qm_list_output]
if not ids:
return []
return ids
def does_vm_exist(self, vm_id):
"""Check if the given VM exists on the Proxmox host."""
return vm_id in self.get_all_vms()
def is_vm_running(self, vm_id):
"""Check if the given VM is running on the Proxmox host."""
if not self.does_vm_exist(vm_id):
return False
qm_status_output = self.run_command(f"qm status {vm_id}")
return "running" in qm_status_output
def start_vm(self, vm_id):
"""Start the given VM on the Proxmox host."""
if not self.is_vm_running(vm_id):
self.run_command(f"qm start {vm_id}")
def stop_vm(self, vm_id):
"""Stop the given VM on the Proxmox host."""
if self.is_vm_running(vm_id):
self.run_command(f"qm stop {vm_id}")
def reboot_vm(self, vm_id):
"""Reboot the given VM on the Proxmox host."""
self.run_command(f"qm reboot {vm_id}")
def copy_file_to_lxc(self, lxc: LXC, source: str or Path, destination: str or Path, permission: int = 644,
owner: str = "root", use_ssh: bool = True):
"""Copy the given file to the given LXC."""
if isinstance(source, Path):
source = str(source.as_posix())
if isinstance(destination, str):
destination = Path(destination)
lxc.create_directory(destination.parent.as_posix(), use_ssh=use_ssh)
self.run_command(f"pct push {lxc.id} {source} {destination.as_posix()}")
if permission != 755:
lxc.run_command(f"chmod {permission} {destination.as_posix()}", return_status_code=True, use_ssh=use_ssh)
if owner != "root":
lxc.run_command(f"chown {owner} {destination.as_posix()}", return_status_code=True)
def copy_folder_to_lxc(self, lxc: LXC, source: str or Path, destination: str or Path, permission: int = 755,
owner: str = "root"):
"""Copy the given folder to the given LXC."""
if isinstance(source, Path):
source = str(source.as_posix())
if isinstance(destination, Path):
destination = str(destination.as_posix())
self.run_command(f"scp -o StrictHostKeyChecking=no -B -r {source} {lxc.get_ssh_string()}:{destination}")
if permission != 755:
lxc.run_command(f"chmod -R {permission} {destination}", return_status_code=True)
if owner != "root":
lxc.run_command(f"chown -R {owner} {destination}", return_status_code=True)
def list_dir(self, directory: str or Path, glob_filter: str = '*'):
"""List the given directory."""
if isinstance(directory, Path):
directory = str(directory.as_posix())
return fnmatch.filter(self.connection.sftp().listdir(path=directory), glob_filter)
def read_file(self, file: str or Path):
"""Read the given file.
Parameters
----------
file: str or Path
The file to read.
Returns
-------
file: file
The file object.
"""
if isinstance(file, Path):
file = str(file.as_posix())
with self.connection.sftp().open(file, 'r') as f:
return f.read()

0
src/utils/__init__.py Normal file
View File

View File

@ -1,108 +0,0 @@
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..lxc.lxc import LXC
from ..machine.machine import LinuxMachine
def are_all_conditions_met(lxc: LXC):
"""
Check conditions for running the creation steps for an LXC.
Parameters
----------
lxc
Returns
-------
"""
results = []
creation_dict = lxc.creation
steps_dict = lxc.creation["steps"]
# Check the global conditions
if 'conditions' in creation_dict:
global_conditions = creation_dict["conditions"]
for condition in global_conditions:
results.append(verify_condition(lxc, condition_type=condition, data=global_conditions[condition]))
# Check the specific conditions for each step
for step in steps_dict:
if "conditions" in step:
for condition in step["conditions"]:
results.append(verify_condition(lxc, condition_type=condition, data=step["conditions"][condition]))
if len(results) == 0:
return False
return all(results)
def verify_condition(linux_machine: LinuxMachine, condition_type: str, data: list or str):
results = []
if type(condition_type) is str:
data = [data]
for condition in data:
print(condition_type, condition)
if condition_type == "file" or condition_type == "files":
results.append(_check_file_condition(linux_machine, condition))
elif condition_type == "folder" or condition_type == "folders":
results.append(_check_folder_condition(linux_machine, condition))
elif condition_type == "program" or condition_type == "programs":
results.append(_check_program_condition(linux_machine, condition))
elif condition_type == "command" or condition_type == "commands":
results.append(_check_command_condition(linux_machine, condition))
elif condition_type == "docker":
results.append(_check_docker_condition(linux_machine, condition))
else:
print("Unknown condition type: " + condition_type)
return False
return all(results)
def verify_step_conditions(linux_machine: LinuxMachine, step: dict):
results = []
if "conditions" in step:
for condition in step["conditions"]:
results.append(
verify_condition(linux_machine, condition_type=condition, data=step["conditions"][condition]))
def _check_file_condition(linux_machine: LinuxMachine, file: list):
return all([linux_machine.has_file(Path(f)) for f in file])
def _check_folder_condition(linux_machine: LinuxMachine, folder: list):
return all([linux_machine.has_directory(Path(f)) for f in folder])
def _check_program_condition(linux_machine: LinuxMachine, program: list):
return all([linux_machine.has_program(p) for p in program])
def _check_command_condition(linux_machine: LinuxMachine, command: list):
result = []
for c in command:
cmd = c.split(" ")[0]
value = c.split(" ")[1]
result = linux_machine.run_command(cmd)
if result != value:
result.append(False)
return all(result)
def _check_docker_condition(linux_machine: LinuxMachine, container: list):
return container in linux_machine.run_command(f"docker ps -q -f name=\"{container}\"")

View File

@ -1,15 +0,0 @@
import logging
from ..machine.machine import LinuxMachine
def clone_repo(linux_machine: LinuxMachine, url, path):
"""Clone the given repository to the given path"""
logging.info(f"Cloning repository {url} to {path}...")
linux_machine.run_command(f"git clone {url} {path}")
def update_repo(linux_machine: LinuxMachine, path):
"""Update the given repository"""
logging.info(f"Updating repository at {path}...")
linux_machine.run_command(f"git -C {path} pull")

309
src/utils/lxc_utils.py Normal file
View File

@ -0,0 +1,309 @@
import json
import logging
from src.utils import proxmox_utils
lxcs = []
def get_all_lxcs():
"""
Get all LXCs
:return: list of all lxcs
"""
return lxcs
def get_lxc(lxc_id):
"""
Get LXC by ID
:param lxc_id: lxc id
:return: lxc object
"""
for lxc in lxcs:
if lxc.get_id() == lxc_id:
return lxc
return None
def load_lxc(file):
"""
Load LXC from JSON file
:param file: json config file
:return:
"""
# Load JSON data
data = json.loads(file)
# Extract values from JSON
lxc_id = data["lxc_id"]
lxc_hostname = data["lxc_hostname"]
os = data["os"]
resources = data["resources"]
network = data["network"]
options = data["options"]
creation = data["creation"]
deploy = data["deploy"]
# Create LXC object
lxc = LXC(lxc_id, lxc_hostname, os, resources, network, options, creation, deploy)
lxcs.append(lxc)
class LXC:
"""
LXC class
"""
def __init__(self, lxc_id, lxc_hostname, os, resources, network, options, creation, deploy):
self.lxc_id = lxc_id
self.lxc_hostname = lxc_hostname
self.os = os
self.os_name = os["name"]
self.os_release = os["release"]
self.resources = resources
self.cpu = resources["cpu"]
self.memory = resources["memory"]
self.disk = resources["disk"]
self.storage = resources["storage"]
self.network = network
self.bridge = network["bridge"]
self.ipv4 = network["ipv4"]
self.ipv6 = network["ipv6"]
self.mac = network["mac"]
self.gateway = network["gateway"]
self.vlan = network["vlan"]
self.options = options
self.privileged = options["privileged"]
self.start_on_boot = options["start_on_boot"]
self.password = options["password"]
self.ssh = options["ssh"]
self.creation = creation
self.deploy = deploy
def __str__(self):
return f"LXC {self.lxc_id} ({self.lxc_hostname})"
def __repr__(self):
return f"LXC {self.lxc_id} ({self.lxc_hostname})"
def __eq__(self, other):
return self.lxc_id == other.lxc_id
def __hash__(self):
return hash(self.lxc_id)
def get_id(self):
"""
Get LXC ID
:return: lxc id
"""
return self.lxc_id
def get_hostname(self):
"""
Get LXC hostname
:return: lxc hostname
"""
return self.lxc_hostname
def get_os(self):
"""
Get OS
:return: os
"""
return self.os
def get_os_name(self):
"""
Get OS name
:return: os name
"""
return self.os_name
def get_os_release(self):
"""
Get OS release
:return: os release
"""
return self.os_release
def get_resources(self):
"""
Get resources
:return: resources
"""
return self.resources
def get_cpu(self):
"""
Get CPU
:return: cpu
"""
return self.cpu
def get_memory(self):
"""
Get memory
:return: memory
"""
return self.memory
def get_disk(self):
"""
Get disk
:return: disk
"""
return self.disk
def get_storage(self):
"""
Get storage
:return: storage
"""
return self.storage
def get_network(self):
"""
Get network
:return: network
"""
return self.network
def get_bridge(self):
"""
Get bridge
:return: bridge
"""
return self.bridge
def get_ipv4(self):
"""
Get IPv4
:return: ipv4
"""
return self.ipv4
def get_ipv6(self):
"""
Get IPv6
:return: ipv6
"""
return self.ipv6
def get_mac(self):
"""
Get MAC
:return: mac
"""
return self.mac
def get_gateway(self):
"""
Get gateway
:return: gateway
"""
return self.gateway
def get_vlan(self):
"""
Get VLAN
:return: vlan
"""
return self.vlan
def get_options(self):
"""
Get options
:return: options
"""
return self.options
def is_privileged(self):
"""
Is privileged
:return: privileged
"""
return self.privileged
def is_start_on_boot(self):
"""
Is start on boot
:return: start on boot
"""
return self.start_on_boot
def get_password(self):
"""
Get password
:return: password
"""
return self.password
def is_ssh_enabled(self):
"""
Is SSH enabled
:return: ssh
"""
return self.ssh
def get_deploy(self):
"""
Get deployements
:return: deployements
"""
return self.deploy
def create(self):
"""
Create LXC
:return:
"""
if proxmox_utils.does_lxc_exist(self.lxc_id):
logging.info(f"LXC {self.lxc_id} already exists, skipping creation")
return
else:
logging.info(f"Creating LXC {self.lxc_id}")
if self.creation['type'] == "tteck":
proxmox_utils.execute_tteck_script(self.creation['script'], self.get_tteck_env_variables())
def deploy(self):
pass
def get_tteck_env_variables(self):
"""
Get TTECK environment variables to run scripts silently
:return: environment variables
"""
env_variables = {
"CT_TYPE": "1",
"PW": self.password,
"CT_ID": self.lxc_id,
"HN": self.lxc_hostname,
"DISK_SIZE": self.disk,
"CORE_COUNT": self.cpu,
"RAM_SIZE": self.memory,
"BRG": self.bridge,
"NET": self.ipv4,
"GATE": self.gateway,
"DISABLEIP6": "no",
"MTU": "",
"SD": "",
"NS": "",
"MAC": self.mac,
"VLAN": self.vlan,
"SSH": self.ssh,
"VERB": "no"
}
env_command = " && ".join([f"export {name}=\"{value}\"" for name, value in env_variables.items()])
return env_command

View File

@ -0,0 +1,68 @@
import logging
import subprocess
def get_pve_version():
"""
Get PVE version
:return: pve version
"""
return run_command_on_pve("pveversion")
def get_pve_hostname():
"""
Get PVE hostname
:return: pve hostname
"""
return run_command_on_pve("hostname")
def does_lxc_exist(lxc_id):
"""
Check if LXC exists
:param lxc_id: lxc id
:return: does lxc exists
"""
# TODO: only check in VMID column
return lxc_id in run_command_on_pve(f"pct list {lxc_id}")
def does_qemu_vm_exist(vm_id):
"""
Check if QEMU VM exists
:param vm_id: vm id
:return: does qemu vm exists
"""
# TODO: only check in VMID column
return vm_id in run_command_on_pve(f"qm list {vm_id}")
def execute_tteck_script(script_url, env_variables):
"""
Execute TTECK script with already filled environment variables to run silently (non-interactive)
:param script_url: script url (github or other)
:param env_variables: list of environment variables
:return: status code
"""
env_variables = " ".join(env_variables)
run_command_on_pve(f"{env_variables} && bash -c \"$(wget -qLO - {script_url}\"")
def run_command_on_pve(command):
"""
Run command on PVE
:param command: command
:return: command
"""
# Run command and return output (not as bytes)
logging.debug(f"Running command on PVE: \n{command}")
return subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
encoding="utf-8").stdout.decode()

View File

@ -1,49 +0,0 @@
from __future__ import annotations
import os
import pathlib
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..lxc.lxc import LXC
def get_path(lxc: LXC, path: str):
"""Returns the complete path to a global, protected or LXC-relative resource file.
For example, if the path is "global/test.txt", the function will return the complete path to
the file in the resources/ folder.
Another example, if the path is "protected/test.txt", the function will return the complete path to
the file in the protected_resources/ folder.
Other folders can be added like "global/scripts/test.sh" or "protected/scripts/test.sh".
If none of the above conditions are met, the function will return the path relative to the LXC folder
depending on the given LXC object.
Parameters
----------
lxc: LXC
LXC object to use to get the relative path
Can be omitted if you know what you are doing and the path is either global or protected.
path: str
Path to the resource file
Returns
-------
pathlib.Path
Complete path to the resource file
"""
parent = pathlib.Path(lxc.pve.repo_path)
if path.startswith("global/"):
# Use global folder
return parent.joinpath(path[len("global/"):])
elif path.startswith("protected/"):
# Use protected folder
app_path = pathlib.Path(os.path.dirname(__file__)).parent.parent
return app_path.joinpath("protected_resources", path[len("protected/"):])
else:
# Use VM/LXC folder
lxc_id = lxc.id
return parent.joinpath("lxc", str(lxc_id), path)

View File

@ -1,197 +0,0 @@
from __future__ import annotations
import logging
import time
from typing import TYPE_CHECKING
from . import git_utils
from .resources_utils import get_path
from ..lxc import lxc_utils
from ..machine.machine import LinuxMachine
from ..utils import conditions_utils
if TYPE_CHECKING:
pass
def _run_script_step(linux_machine, step):
if "LXC" in str(linux_machine.__class__):
lxc_utils.run_script_step_parser(linux_machine, step)
else:
logging.warning("Script step only supported on LXCs")
pass
def _run_file_create_step(linux_machine, step):
linux_machine.create_file(step["path"], step.get("permission", 644), step.get("owner", "root"))
def _run_file_copy_step(linux_machine, step):
if "LXC" in str(linux_machine.__class__):
linux_machine.pve.copy_file_to_lxc(linux_machine, get_path(linux_machine, step["path"]), step["destination"],
step.get("permission", 644), step.get("owner", "root"))
else:
logging.warning(f"File copy step only supported on LXCs")
def _run_folder_create_step(linux_machine, step):
linux_machine.create_directory(step["path"], step.get("permission", 755), step.get("owner", "root"))
def _run_folder_copy_step(linux_machine, step):
if "LXC" in str(linux_machine.__class__):
linux_machine.pve.copy_folder_to_lxc(linux_machine, get_path(linux_machine, step["path"]), step["destination"],
step.get("permission", 644), step.get("owner", "root"))
else:
logging.warning(f"Folder copy step only supported on LXCs")
def _run_move_step(linux_machine, step):
linux_machine.move(step["source"], step["destination"], step.get("permission", None), step.get("owner", "root"))
def _run_command_step(linux_machine, step):
linux_machine.run_command(command=step["command"], working_directory=step.get("workdir"),
return_status_code=True)
def _run_docker_step(linux_machine, step):
linux_machine.run_docker_command(step["container"], step["command"])
def _run_docker_compose_step(linux_machine, step):
linux_machine.run_docker_compose_command(command=step["command"], working_directory=step.get("working_directory"))
def _run_git_clone_step(linux_machine, step):
git_utils.clone_repo(linux_machine=linux_machine, url=step["url"], path=step["destination"])
def _run_git_pull_step(linux_machine, step):
git_utils.update_repo(linux_machine=linux_machine, path=step["path"])
def _run_download_step(linux_machine, step):
linux_machine.download_file(step["url"], step["destination"])
def _run_install_package_step(linux_machine, step):
linux_machine.install_package(step["package"])
def _run_remove_package_step(linux_machine, step):
linux_machine.remove_package(step["package"])
def _run_start_step(linux_machine):
linux_machine.start()
def _run_stop_step(linux_machine):
linux_machine.stop()
def _run_reboot_step(linux_machine):
linux_machine.reboot()
def _run_service_start_step(linux_machine, step):
linux_machine.start_service(step["service"])
def _run_service_stop_step(linux_machine, step):
linux_machine.stop_service(step["service"])
def _run_service_restart_step(linux_machine, step):
linux_machine.restart_service(step["service"])
def _run_service_enable_step(linux_machine, step):
linux_machine.enable_service(step["service"])
def _run_service_disable_step(linux_machine, step):
linux_machine.disable_service(step["service"])
def _run_replace_in_file_step(linux_machine, step):
# TODO : improve this
linux_machine.replace_in_files(step["path"], step["search"], step["replace"],
case_sensitive=step.get("case_sensitive", False))
def _run_unzip_step(linux_machine, step):
linux_machine.unzip_file(step["path"], step.get("destination"))
def _run_wait_step(step):
time.sleep(step["seconds"])
def run_steps(linux_machine: LinuxMachine, steps: dict):
for index, step in enumerate(steps):
if conditions_utils.verify_step_conditions(linux_machine, step):
logging.info(f"Conditions already met for step {index + 1}/{len(steps)} for LXC {linux_machine.id}, "
f"skipping...")
continue
logging.info(
f"Running step {index + 1}/{len(steps)} for LXC {linux_machine.id}...\n {step['warning'] if 'warning' in step else ''}")
# Run command step
match step['type']:
case "script":
_run_script_step(linux_machine, step)
case "file_create":
_run_file_create_step(linux_machine, step)
case "file_copy":
_run_file_copy_step(linux_machine, step)
case "folder_create":
_run_folder_create_step(linux_machine, step)
case "folder_copy":
_run_folder_copy_step(linux_machine, step)
case "move":
_run_move_step(linux_machine, step)
case "command":
_run_command_step(linux_machine, step)
case "docker":
_run_docker_step(linux_machine, step)
case "docker_compose":
_run_docker_compose_step(linux_machine, step)
case "git_clone":
_run_git_clone_step(linux_machine, step)
case "git_pull":
_run_git_pull_step(linux_machine, step)
case "download":
_run_download_step(linux_machine, step)
case "install_package":
_run_install_package_step(linux_machine, step)
case "remove_package":
_run_remove_package_step(linux_machine, step)
case "start":
_run_start_step(linux_machine)
case "stop":
_run_stop_step(linux_machine)
case "reboot":
_run_reboot_step(linux_machine)
case "service_start":
_run_service_start_step(linux_machine, step)
case "service_stop":
_run_service_stop_step(linux_machine, step)
case "service_restart":
_run_service_restart_step(linux_machine, step)
case "service_enable":
_run_service_enable_step(linux_machine, step)
case "service_disable":
_run_service_disable_step(linux_machine, step)
case "replace_in_file":
_run_replace_in_file_step(linux_machine, step)
case "unzip":
_run_unzip_step(linux_machine, step)
case "wait":
_run_wait_step(step)
case _:
logging.warning(f"Unknown step type {step['type']}")

View File

@ -1,48 +0,0 @@
from pathlib import Path
from ..proxmox.proxmox import ProxmoxHost
def optional(var: any, placeholder: any):
"""Return a placeholder if the given variable is None, otherwise return the variable
Parameters
----------
var: any
Variable to check if it is None
placeholder: any
Placeholder to return if the variable is None
Returns
-------
any[any]
The variable if it is not None, otherwise the placeholder
"""
if var is None:
return placeholder
else:
return var
def copy_local_file_to_pve(pve: ProxmoxHost, local_source: str or Path, destination: str or Path):
"""Copy a local file (in the compiled app for users) to a Proxmox VE host
Parameters
----------
pve: ProxmoxHost
Proxmox VE host to copy the file to
local_source: str or Path
Path to the local file to copy
destination: str or Path
Destination path on the Proxmox VE host
"""
if isinstance(local_source, Path):
local_source = str(local_source.as_posix())
if isinstance(destination, str):
destination = Path(destination)
pve.run_command(f"mkdir -p {destination.parent.as_posix()}")
pve.connection.put(local_source, destination.as_posix())