Compare commits
26 Commits
Author | SHA1 | Date | |
---|---|---|---|
f82826fc0a | |||
67df62ff91 | |||
59ff04775c | |||
08d9f78a35 | |||
591396895d | |||
daf2d6d63e | |||
94a65178c0 | |||
b4b0bead89 | |||
723b7b1284 | |||
eaf434efc8 | |||
767d877442 | |||
3e13d1ddb7 | |||
8f2f03f134 | |||
af870c63eb | |||
3a3775f457 | |||
3fc5fb948f | |||
bc16d06ac6 | |||
e67e29facc | |||
7e6736c8b3 | |||
0f72986d2a | |||
ab2c8e0376 | |||
e5b6af8e41 | |||
b771739161 | |||
ad53165be2 | |||
10324f542b | |||
d69ec9c352 |
44
.github/wiki/script-auto-config-linux.sh
vendored
44
.github/wiki/script-auto-config-linux.sh
vendored
@ -1,34 +1,51 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Retrieve the username of the person who invoked sudo
|
||||
USER_BEHIND_SUDO=$(who am i | awk '{print $1}')
|
||||
|
||||
# Function to print colored text
|
||||
print_colored() {
|
||||
local color="$1"
|
||||
local text="$2"
|
||||
echo -e "${color}${text}\033[0m"
|
||||
}
|
||||
|
||||
# Define colors
|
||||
COLOR_RED="\033[0;31m"
|
||||
COLOR_GREEN="\033[0;32m"
|
||||
COLOR_YELLOW="\033[1;33m"
|
||||
COLOR_BLUE="\033[0;34m"
|
||||
|
||||
# Ask for HomeAssistant IP
|
||||
echo "Please enter your HomeAssistant IP address:"
|
||||
print_colored "$COLOR_BLUE" "Please enter your HomeAssistant local IP address (even if behind proxy, need LAN address):"
|
||||
read -r HOMEASSISTANT_IP
|
||||
|
||||
# Enable SSH Server
|
||||
echo "Enabling SSH Server..."
|
||||
print_colored "$COLOR_BLUE" "Enabling SSH Server..."
|
||||
if command -v systemctl &> /dev/null; then
|
||||
sudo systemctl enable --now sshd
|
||||
print_colored "$COLOR_GREEN" "SSH Server enabled successfully."
|
||||
else
|
||||
echo "Systemctl not found. Please enable SSH manually."
|
||||
print_colored "$COLOR_RED" "Systemctl not found. Please enable SSH manually."
|
||||
fi
|
||||
|
||||
# Configure sudoers
|
||||
echo "Configuring sudoers..."
|
||||
echo -e "\n# Allow your user to execute specific commands without a password (for EasyComputerManager/HA)" | sudo tee -a /etc/sudoers
|
||||
echo "$USER_BEHIND_SUDO ALL=(ALL) NOPASSWD: /sbin/shutdown, /sbin/init, /usr/bin/systemctl, /usr/sbin/pm-suspend, /usr/bin/awk, /usr/sbin/grub-reboot, /usr/sbin/grub2-reboot" | sudo tee -a /etc/sudoers
|
||||
print_colored "$COLOR_BLUE" "Configuring sudoers..."
|
||||
echo -e "\n# Allow your user to execute specific commands without a password (for EasyComputerManager/HA)" | sudo tee -a /etc/sudoers > /dev/null
|
||||
echo "$USER_BEHIND_SUDO ALL=(ALL) NOPASSWD: /sbin/shutdown, /sbin/init, /usr/sbin/pm-suspend, /usr/sbin/grub-reboot, /usr/sbin/grub2-reboot, /usr/bin/cat /etc/grub2.cfg, /usr/bin/cat /etc/grub.cfg, /usr/bin/systemctl poweroff, /usr/bin/systemctl reboot, /usr/bin/systemctl suspend" | sudo tee -a /etc/sudoers > /dev/null
|
||||
print_colored "$COLOR_GREEN" "Sudoers file configured successfully."
|
||||
|
||||
# Firewall Configuration
|
||||
echo "Configuring firewall..."
|
||||
print_colored "$COLOR_BLUE" "Configuring firewall..."
|
||||
if command -v ufw &> /dev/null; then
|
||||
sudo ufw allow 22
|
||||
print_colored "$COLOR_GREEN" "Firewall configured to allow SSH."
|
||||
else
|
||||
echo "UFW not found. Please configure the firewall manually (if needed)."
|
||||
print_colored "$COLOR_RED" "UFW not found. Please configure the firewall manually (if needed)."
|
||||
fi
|
||||
|
||||
# Setup xhost for GUI apps
|
||||
echo "Configuring persistent xhost for starting GUI apps (like Steam)..."
|
||||
print_colored "$COLOR_BLUE" "Configuring persistent xhost for starting GUI apps (like Steam)..."
|
||||
COMMANDS="xhost +$HOMEASSISTANT_IP; xhost +localhost"
|
||||
DESKTOP_ENTRY_NAME="EasyComputerManager-AutoStart"
|
||||
DESKTOP_ENTRY_PATH="/home/$USER_BEHIND_SUDO/.config/autostart/$DESKTOP_ENTRY_NAME.desktop"
|
||||
@ -44,8 +61,9 @@ NoDisplay=false
|
||||
X-GNOME-Autostart-enabled=true
|
||||
EOF
|
||||
chmod +x "$DESKTOP_ENTRY_PATH"
|
||||
print_colored "$COLOR_GREEN" "Desktop entry created at $DESKTOP_ENTRY_PATH."
|
||||
|
||||
echo ""
|
||||
echo "Done! Some features may require a reboot to work including:"
|
||||
echo " - Starting GUI apps from HomeAssistant"
|
||||
echo "You can now add your computer to HomeAssistant."
|
||||
print_colored "$COLOR_GREEN" "\nDone! Some features may require a reboot to work including:"
|
||||
print_colored "$COLOR_YELLOW" " - Starting GUI apps from HomeAssistant"
|
||||
print_colored "$COLOR_GREEN" "\nYou can now add your computer to HomeAssistant."
|
||||
print_colored "$COLOR_RED" "\nWARNING : Don't forget to install these packages : gnome-monitor-config, pactl, bluetoothctl"
|
||||
|
6
.github/workflows/hassfest.yaml
vendored
6
.github/workflows/hassfest.yaml
vendored
@ -4,11 +4,11 @@ on:
|
||||
push:
|
||||
pull_request:
|
||||
schedule:
|
||||
- cron: '0 0 * * *'
|
||||
- cron: '0 0 * * *'
|
||||
|
||||
jobs:
|
||||
validate:
|
||||
runs-on: "ubuntu-latest"
|
||||
steps:
|
||||
- uses: "actions/checkout@v4"
|
||||
- uses: "home-assistant/actions/hassfest@master"
|
||||
- uses: "actions/checkout@v4"
|
||||
- uses: "home-assistant/actions/hassfest@master"
|
40
README.md
40
README.md
@ -5,7 +5,9 @@
|
||||
|
||||
![img.png](.images/header.png)
|
||||
|
||||
Easy Computer Manager is a custom integration for Home Assistant that allows you to remotely manage various aspects of your computer, such as sending Wake-On-LAN (WoL) packets, restarting the computer between different operating systems (if dual-boot), adjusting audio configurations, changing monitor settings, and more.
|
||||
Easy Computer Manager is a custom integration for Home Assistant that allows you to remotely manage various aspects of
|
||||
your computer, such as sending Wake-On-LAN (WoL) packets, restarting the computer between different operating systems (
|
||||
if dual-boot), adjusting audio configurations, changing monitor settings, and more.
|
||||
|
||||
## Features
|
||||
|
||||
@ -26,33 +28,55 @@ Easy Computer Manager is a custom integration for Home Assistant that allows you
|
||||
1. Install [HACS](https://hacs.xyz/) if not already installed.
|
||||
2. In Home Assistant, go to "HACS" in the sidebar.
|
||||
3. Click on "Integrations."
|
||||
4. Search for "easy_computer_manager" and click "Install."
|
||||
4. Click on the three dots in the top right corner and select "Custom repositories."
|
||||
5. Paste the following URL in the "Repo" field: https://github.com/M4TH1EU/HA-EasyComputerManager
|
||||
6. Select "Integration" from the "Category" dropdown.
|
||||
7. Click "Add."
|
||||
8. Search for "easy_computer_manager" and click "Install."
|
||||
|
||||
### Manually
|
||||
|
||||
1. Download the latest release from the [GitHub repository](https://github.com/M4TH1EU/HA-EasyComputerManager/).
|
||||
2. Extract the downloaded ZIP file.
|
||||
3. Copy the "custom_components/easy_computer_manager" directory to the "config/custom_components/" directory in your Home Assistant instance.
|
||||
3. Copy the "custom_components/easy_computer_manager" directory to the "config/custom_components/" directory in your
|
||||
Home Assistant instance.
|
||||
|
||||
## Configuration
|
||||
# Preparing your computer (required)
|
||||
> [!CAUTION]
|
||||
> Before adding your computer to Home Assistant, ensure that it is properly configured.
|
||||
|
||||
**See wiki page [here](https://github.com/M4TH1EU/HA-EasyComputerManager/wiki/Prepare-your-computer).**
|
||||
|
||||
## Add Computer to Home Assistant
|
||||
|
||||
1. In Home Assistant, go to "Configuration" in the sidebar.
|
||||
2. Select "Integrations."
|
||||
3. Click the "+" button to add a new integration.
|
||||
4. Search for "Easy Computer Manager" and select it from the list.
|
||||
5. Follow the on-screen instructions to configure the integration, providing details such as the IP address, username, and password for the computer you want to manage.
|
||||
5. Follow the on-screen instructions to configure the integration, providing details such as the IP address, mac-adress, username,
|
||||
and password for the computer you want to add.
|
||||
6. Once configured, click "Finish" to add the computer to Home Assistant.
|
||||
|
||||
> [!NOTE]
|
||||
> If you are managing a dual-boot computer, ensure that the "Dual boot system" checkbox is enabled during the configuration.
|
||||
|
||||
## Usage
|
||||
|
||||
After adding your computer to Home Assistant, you can use the provided services to manage it remotely. Explore the available services in the Home Assistant "Services" tab or use automations to integrate Easy Computer Manager into your smart home setup.
|
||||
After adding your computer to Home Assistant, you can use the provided services to manage it remotely. Explore the
|
||||
available services in the Home Assistant "Services" tab or use automations to integrate Easy Computer Manager into your
|
||||
smart home setup.
|
||||
|
||||
## Services
|
||||
A detailed list of available services and their parameters can be found in the wiki [here](https://github.com/M4TH1EU/HA-EasyComputerManager/wiki/Services).
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
If you encounter any issues during installation or configuration, refer to the troubleshooting section in the [Wiki](./wiki) or seek assistance from the Home Assistant community.
|
||||
If you encounter any issues during installation or configuration, refer to the troubleshooting section in
|
||||
the [Wiki](./wiki) or seek assistance from the Home Assistant community.
|
||||
|
||||
## Contributions
|
||||
|
||||
Contributions are welcome! If you find any bugs or have suggestions for improvements, please open an issue or submit a pull request on the [GitHub repository](https://github.com/M4TH1EU/HA-EasyComputerManager).
|
||||
Contributions are welcome! If you find any bugs or have suggestions for improvements, please open an issue or submit a
|
||||
pull request on the [GitHub repository](https://github.com/M4TH1EU/HA-EasyComputerManager).
|
||||
|
||||
Happy automating with Easy Computer Manager!
|
@ -3,6 +3,7 @@
|
||||
# Some snippets of code are from the official wake_on_lan integration (inspiration for this custom component)
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from functools import partial
|
||||
|
||||
@ -15,7 +16,7 @@ from homeassistant.core import HomeAssistant, ServiceCall
|
||||
|
||||
from .const import DOMAIN, SERVICE_SEND_MAGIC_PACKET, SERVICE_CHANGE_MONITORS_CONFIG
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA = vol.Schema({
|
||||
vol.Required(CONF_MAC): cv.string,
|
||||
@ -39,7 +40,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
if broadcast_port is not None:
|
||||
service_kwargs["port"] = broadcast_port
|
||||
|
||||
_LOGGER.info(
|
||||
LOGGER.info(
|
||||
"Sending magic packet to MAC %s (broadcast: %s, port: %s)",
|
||||
mac_address,
|
||||
broadcast_address,
|
||||
@ -58,11 +59,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
schema=WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA,
|
||||
)
|
||||
|
||||
hass.async_create_task(
|
||||
hass.config_entries.async_forward_entry_setup(
|
||||
entry, "switch"
|
||||
)
|
||||
)
|
||||
await hass.config_entries.async_forward_entry_setups(entry, ["switch"])
|
||||
|
||||
return True
|
||||
|
||||
|
208
custom_components/easy_computer_manager/computer/__init__.py
Normal file
208
custom_components/easy_computer_manager/computer/__init__.py
Normal file
@ -0,0 +1,208 @@
|
||||
import asyncio
|
||||
from typing import Optional, Dict, Any
|
||||
|
||||
from wakeonlan import send_magic_packet
|
||||
|
||||
from custom_components.easy_computer_manager import const, LOGGER
|
||||
from custom_components.easy_computer_manager.computer.common import OSType, CommandOutput
|
||||
from custom_components.easy_computer_manager.computer.formatter import format_gnome_monitors_args, format_pactl_commands
|
||||
from custom_components.easy_computer_manager.computer.parser import parse_gnome_monitors_output, parse_pactl_output, \
|
||||
parse_bluetoothctl
|
||||
from custom_components.easy_computer_manager.computer.ssh_client import SSHClient
|
||||
|
||||
|
||||
class Computer:
|
||||
def __init__(self, host: str, mac: str, username: str, password: str, port: int = 22,
|
||||
dualboot: bool = False) -> None:
|
||||
"""Initialize the Computer object."""
|
||||
self.initialized = False # used to avoid duplicated ssh connections
|
||||
|
||||
self.host = host
|
||||
self.mac = mac
|
||||
self.username = username
|
||||
self._password = password
|
||||
self.port = port
|
||||
self.dualboot = dualboot
|
||||
|
||||
self.operating_system: Optional[OSType] = None
|
||||
self.operating_system_version: Optional[str] = None
|
||||
self.desktop_environment: Optional[str] = None
|
||||
self.windows_entry_grub: Optional[str] = None
|
||||
self.monitors_config: Optional[Dict[str, Any]] = None
|
||||
self.audio_config: Dict[str, Optional[Dict]] = {}
|
||||
self.bluetooth_devices: Dict[str, Any] = {}
|
||||
|
||||
self._connection: SSHClient = SSHClient(host, username, password, port)
|
||||
asyncio.create_task(self._connection.connect(computer=self))
|
||||
|
||||
async def update(self, state: Optional[bool] = True, timeout: Optional[int] = 2) -> None:
|
||||
"""Update computer details."""
|
||||
if not state or not await self.is_on():
|
||||
LOGGER.debug("Computer is off, skipping update")
|
||||
return
|
||||
|
||||
# Ensure connection is established before updating
|
||||
await self._ensure_connection_alive(timeout)
|
||||
|
||||
# Update tasks
|
||||
await asyncio.gather(
|
||||
self._update_operating_system(),
|
||||
self._update_operating_system_version(),
|
||||
self._update_desktop_environment(),
|
||||
self._update_windows_entry_grub(),
|
||||
self._update_monitors_config(),
|
||||
self._update_audio_config(),
|
||||
self._update_bluetooth_devices()
|
||||
)
|
||||
|
||||
async def _ensure_connection_alive(self, timeout: int) -> None:
|
||||
"""Ensure the SSH connection is alive, reconnect if necessary."""
|
||||
for _ in range(timeout * 4):
|
||||
if self._connection.is_connection_alive():
|
||||
return
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
if not self._connection.is_connection_alive():
|
||||
LOGGER.debug(f"Reconnecting to {self.host}")
|
||||
await self._connection.connect()
|
||||
if not self._connection.is_connection_alive():
|
||||
LOGGER.debug(f"Failed to connect to {self.host} after timeout={timeout}s")
|
||||
raise ConnectionError("SSH connection could not be re-established")
|
||||
|
||||
async def _update_operating_system(self) -> None:
|
||||
"""Update the operating system information."""
|
||||
self.operating_system = await self._detect_operating_system()
|
||||
|
||||
async def _update_operating_system_version(self) -> None:
|
||||
"""Update the operating system version."""
|
||||
self.operating_system_version = (await self.run_action("operating_system_version")).output
|
||||
|
||||
async def _update_desktop_environment(self) -> None:
|
||||
"""Update the desktop environment information."""
|
||||
self.desktop_environment = (await self.run_action("desktop_environment")).output.lower()
|
||||
|
||||
async def _update_windows_entry_grub(self) -> None:
|
||||
"""Update Windows entry in GRUB (if applicable)."""
|
||||
self.windows_entry_grub = (await self.run_action("get_windows_entry_grub")).output
|
||||
|
||||
async def _update_monitors_config(self) -> None:
|
||||
"""Update monitors configuration."""
|
||||
if self.operating_system == OSType.LINUX:
|
||||
output = (await self.run_action("get_monitors_config")).output
|
||||
self.monitors_config = parse_gnome_monitors_output(output)
|
||||
# TODO: Implement for Windows if needed
|
||||
|
||||
async def _update_audio_config(self) -> None:
|
||||
"""Update audio configuration."""
|
||||
speakers_output = (await self.run_action("get_speakers")).output
|
||||
microphones_output = (await self.run_action("get_microphones")).output
|
||||
|
||||
if self.operating_system == OSType.LINUX:
|
||||
self.audio_config = parse_pactl_output(speakers_output, microphones_output)
|
||||
# TODO: Implement for Windows
|
||||
|
||||
async def _update_bluetooth_devices(self) -> None:
|
||||
"""Update Bluetooth devices list."""
|
||||
if self.operating_system == OSType.LINUX:
|
||||
self.bluetooth_devices = parse_bluetoothctl(await self.run_action("get_bluetooth_devices"))
|
||||
# TODO: Implement for Windows
|
||||
|
||||
async def _detect_operating_system(self) -> OSType:
|
||||
"""Detect the operating system by running a uname command."""
|
||||
result = await self.run_manually("uname")
|
||||
return OSType.LINUX if result.successful() else OSType.WINDOWS
|
||||
|
||||
async def is_on(self, timeout: int = 1) -> bool:
|
||||
"""Check if the computer is on by pinging it."""
|
||||
ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self.host)]
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*ping_cmd,
|
||||
stdout=asyncio.subprocess.DEVNULL,
|
||||
stderr=asyncio.subprocess.DEVNULL
|
||||
)
|
||||
await proc.communicate()
|
||||
return proc.returncode == 0
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the computer using Wake-on-LAN."""
|
||||
send_magic_packet(self.mac)
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
"""Shutdown the computer."""
|
||||
await self.run_action("shutdown")
|
||||
|
||||
async def restart(self, from_os: Optional[OSType] = None, to_os: Optional[OSType] = None) -> None:
|
||||
"""Restart the computer."""
|
||||
await self.run_action("restart")
|
||||
|
||||
async def put_to_sleep(self) -> None:
|
||||
"""Put the computer to sleep."""
|
||||
await self.run_action("sleep")
|
||||
|
||||
async def set_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
|
||||
"""Set monitors configuration."""
|
||||
if self.is_linux() and self.desktop_environment == 'gnome':
|
||||
await self.run_action("set_monitors_config", params={"args": format_gnome_monitors_args(monitors_config)})
|
||||
|
||||
async def set_audio_config(self, volume: Optional[int] = None, mute: Optional[bool] = None,
|
||||
input_device: Optional[str] = None, output_device: Optional[str] = None) -> None:
|
||||
"""Set audio configuration."""
|
||||
if self.is_linux() and self.desktop_environment == 'gnome':
|
||||
pactl_commands = format_pactl_commands(self.audio_config, volume, mute, input_device, output_device)
|
||||
for command in pactl_commands:
|
||||
await self.run_action("set_audio_config", params={"args": command})
|
||||
|
||||
async def install_nircmd(self) -> None:
|
||||
"""Install NirCmd tool (Windows specific)."""
|
||||
install_path = f"C:\\Users\\{self.username}\\AppData\\Local\\EasyComputerManager"
|
||||
await self.run_action("install_nircmd", params={
|
||||
"download_url": "https://www.nirsoft.net/utils/nircmd.zip",
|
||||
"install_path": install_path
|
||||
})
|
||||
|
||||
async def steam_big_picture(self, action: str) -> None:
|
||||
"""Start, stop, or exit Steam Big Picture mode."""
|
||||
await self.run_action(f"{action}_steam_big_picture")
|
||||
|
||||
async def run_action(self, id: str, params: Optional[Dict[str, Any]] = None,
|
||||
raise_on_error: bool = False) -> CommandOutput:
|
||||
"""Run a predefined action via SSH."""
|
||||
params = params or {}
|
||||
|
||||
action = const.ACTIONS.get(id)
|
||||
if not action:
|
||||
LOGGER.error(f"Action {id} not found.")
|
||||
return CommandOutput("", 1, "", "Action not found")
|
||||
|
||||
if not self.operating_system:
|
||||
self.operating_system = await self._detect_operating_system()
|
||||
|
||||
os_commands = action.get(self.operating_system.lower())
|
||||
if not os_commands:
|
||||
raise ValueError(f"Action {id} not supported for OS: {self.operating_system}")
|
||||
|
||||
commands = os_commands if isinstance(os_commands, list) else os_commands.get("commands",
|
||||
[os_commands.get("command")])
|
||||
required_params = []
|
||||
if "params" in os_commands:
|
||||
required_params = os_commands.get("params", [])
|
||||
|
||||
# Validate parameters
|
||||
if sorted(required_params) != sorted(params.keys()):
|
||||
raise ValueError(f"Invalid/missing parameters for action: {id}")
|
||||
|
||||
for command in commands:
|
||||
for param, value in params.items():
|
||||
command = command.replace(f"%{param}%", str(value))
|
||||
|
||||
result = await self.run_manually(command)
|
||||
if result.successful():
|
||||
return result
|
||||
elif raise_on_error:
|
||||
raise ValueError(f"Command failed: {command}")
|
||||
|
||||
return result
|
||||
|
||||
async def run_manually(self, command: str) -> CommandOutput:
|
||||
"""Run a custom command manually via SSH."""
|
||||
return await self._connection.execute_command(command)
|
18
custom_components/easy_computer_manager/computer/common.py
Normal file
18
custom_components/easy_computer_manager/computer/common.py
Normal file
@ -0,0 +1,18 @@
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class OSType(str, Enum):
|
||||
WINDOWS = "Windows"
|
||||
LINUX = "Linux"
|
||||
MACOS = "MacOS"
|
||||
|
||||
|
||||
class CommandOutput:
|
||||
def __init__(self, command: str, return_code: int, output: str, error: str) -> None:
|
||||
self.command = command
|
||||
self.return_code = return_code
|
||||
self.output = output.strip()
|
||||
self.error = error.strip()
|
||||
|
||||
def successful(self) -> bool:
|
||||
return self.return_code == 0
|
@ -0,0 +1,62 @@
|
||||
def format_gnome_monitors_args(monitors_config: dict):
|
||||
args = []
|
||||
|
||||
monitors_config = monitors_config.get('monitors_config', {})
|
||||
|
||||
for monitor, settings in monitors_config.items():
|
||||
if settings.get('enabled', False):
|
||||
args.extend(['-LpM' if settings.get('primary', False) else '-LM', monitor])
|
||||
|
||||
if 'position' in settings:
|
||||
args.extend(['-x', str(settings["position"][0]), '-y', str(settings["position"][1])])
|
||||
|
||||
if 'mode' in settings:
|
||||
args.extend(['-m', settings["mode"]])
|
||||
|
||||
if 'scale' in settings:
|
||||
args.extend(['-s', str(settings["scale"])])
|
||||
|
||||
if 'transform' in settings:
|
||||
args.extend(['-t', settings["transform"]])
|
||||
|
||||
return ' '.join(args)
|
||||
|
||||
|
||||
def format_pactl_commands(current_config: {}, volume: int, mute: bool, input_device: str = "@DEFAULT_SOURCE@",
|
||||
output_device: str = "@DEFAULT_SINK@"):
|
||||
"""Change audio configuration on the host system."""
|
||||
|
||||
commands = []
|
||||
|
||||
def get_device_id(device_type, user_device):
|
||||
for device in current_config[device_type]:
|
||||
if device['description'] == user_device:
|
||||
return device['name']
|
||||
return user_device
|
||||
|
||||
# Set default sink and source if not specified
|
||||
if not output_device:
|
||||
output_device = "@DEFAULT_SINK@"
|
||||
if not input_device:
|
||||
input_device = "@DEFAULT_SOURCE@"
|
||||
|
||||
# Set default sink if specified
|
||||
if output_device and output_device != "@DEFAULT_SINK@":
|
||||
output_device = get_device_id('sinks', output_device)
|
||||
commands.append(f"set-default-sink {output_device}")
|
||||
|
||||
# Set default source if specified
|
||||
if input_device and input_device != "@DEFAULT_SOURCE@":
|
||||
input_device = get_device_id('sources', input_device)
|
||||
commands.append(f"set-default-source {input_device}")
|
||||
|
||||
# Set sink volume if specified
|
||||
if volume is not None:
|
||||
commands.append(f"set-sink-volume {output_device} {volume}%")
|
||||
|
||||
# Set sink and source mute status if specified
|
||||
if mute is not None:
|
||||
commands.append(f"set-sink-mute {output_device} {'yes' if mute else 'no'}")
|
||||
commands.append(f"set-source-mute {input_device} {'yes' if mute else 'no'}")
|
||||
|
||||
return commands
|
168
custom_components/easy_computer_manager/computer/parser.py
Normal file
168
custom_components/easy_computer_manager/computer/parser.py
Normal file
@ -0,0 +1,168 @@
|
||||
import re
|
||||
|
||||
from custom_components.easy_computer_manager import LOGGER
|
||||
from custom_components.easy_computer_manager.computer import CommandOutput
|
||||
|
||||
|
||||
def parse_gnome_monitors_output(config: str) -> list:
|
||||
"""
|
||||
Parse the GNOME monitors configuration.
|
||||
|
||||
:param config:
|
||||
The output of the gnome-monitor-config list command.
|
||||
|
||||
:type config: str
|
||||
|
||||
:returns: list
|
||||
The parsed monitors configuration.
|
||||
"""
|
||||
|
||||
monitors = []
|
||||
current_monitor = None
|
||||
|
||||
for line in config.split('\n'):
|
||||
monitor_match = re.match(r'^Monitor \[ (.+?) \] (ON|OFF)$', line)
|
||||
if monitor_match:
|
||||
if current_monitor:
|
||||
monitors.append(current_monitor)
|
||||
source, status = monitor_match.groups()
|
||||
current_monitor = {'source': source, 'status': status, 'names': [], 'resolutions': []}
|
||||
elif current_monitor:
|
||||
display_name_match = re.match(r'^\s+display-name: (.+)$', line)
|
||||
resolution_match = re.match(r'^\s+(\d+x\d+@\d+(?:\.\d+)?).*$', line)
|
||||
if display_name_match:
|
||||
current_monitor['names'].append(display_name_match.group(1).replace('"', ''))
|
||||
elif resolution_match:
|
||||
# Don't include resolutions under 1280x720
|
||||
if int(resolution_match.group(1).split('@')[0].split('x')[0]) >= 1280:
|
||||
|
||||
# If there are already resolutions in the list, check if the framerate between the last is >1
|
||||
if len(current_monitor['resolutions']) > 0:
|
||||
last_resolution = current_monitor['resolutions'][-1]
|
||||
last_resolution_size = last_resolution.split('@')[0]
|
||||
this_resolution_size = resolution_match.group(1).split('@')[0]
|
||||
|
||||
# Only truncate some framerates if the resolution are the same
|
||||
if last_resolution_size == this_resolution_size:
|
||||
last_resolution_framerate = float(last_resolution.split('@')[1])
|
||||
this_resolution_framerate = float(resolution_match.group(1).split('@')[1])
|
||||
|
||||
# If the difference between the last resolution framerate and this one is >1, ignore it
|
||||
if last_resolution_framerate - 1 > this_resolution_framerate:
|
||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||
else:
|
||||
# If the resolution is different, this adds the new resolution
|
||||
# to the list without truncating
|
||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||
else:
|
||||
# This is the first resolution, add it to the list
|
||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||
|
||||
if current_monitor:
|
||||
monitors.append(current_monitor)
|
||||
|
||||
return monitors
|
||||
|
||||
|
||||
def parse_pactl_output(config_speakers: str, config_microphones: str) -> dict[str, list]:
|
||||
"""
|
||||
Parse the pactl audio configuration.
|
||||
|
||||
:param config_speakers:
|
||||
The output of the pactl list sinks command.
|
||||
:param config_microphones:
|
||||
The output of the pactl list sources command.
|
||||
|
||||
:type config_speakers: str
|
||||
:type config_microphones: str
|
||||
|
||||
:returns: dict
|
||||
The parsed audio configuration.
|
||||
|
||||
"""
|
||||
|
||||
config = {'speakers': [], 'microphones': []}
|
||||
|
||||
def parse_device_info(lines, device_type):
|
||||
devices = []
|
||||
current_device = {}
|
||||
|
||||
for line in lines:
|
||||
if line.startswith(f"{device_type} #"):
|
||||
if current_device and "Monitor" not in current_device['description']:
|
||||
devices.append(current_device)
|
||||
current_device = {'id': int(re.search(r'#(\d+)', line).group(1))}
|
||||
elif line.startswith(" Name:"):
|
||||
current_device['name'] = line.split(":")[1].strip()
|
||||
elif line.startswith(" State:"):
|
||||
current_device['state'] = line.split(":")[1].strip()
|
||||
elif line.startswith(" Description:"):
|
||||
current_device['description'] = line.split(":")[1].strip()
|
||||
|
||||
if current_device:
|
||||
devices.append(current_device)
|
||||
|
||||
return devices
|
||||
|
||||
config['speakers'] = parse_device_info(config_speakers.split('\n'), 'Sink')
|
||||
config['microphones'] = parse_device_info(config_microphones.split('\n'), 'Source')
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def parse_bluetoothctl(command: CommandOutput, connected_devices_only: bool = True,
|
||||
return_as_string: bool = False) -> list | str:
|
||||
"""Parse the bluetoothctl info command.
|
||||
|
||||
:param command:
|
||||
The command output.
|
||||
:param connected_devices_only:
|
||||
Only return connected devices.
|
||||
Will return all devices, connected or not, if False.
|
||||
|
||||
:type command: :class: CommandOutput
|
||||
:type connected_devices_only: bool
|
||||
|
||||
:returns: str | list
|
||||
The parsed bluetooth devices.
|
||||
|
||||
"""
|
||||
|
||||
if not command.successful():
|
||||
if command.output.__contains__("Missing device address argument"): # Means no devices are connected
|
||||
return "" if return_as_string else []
|
||||
else:
|
||||
LOGGER.warning(f"Cannot retrieve bluetooth devices, make sure bluetoothctl is installed")
|
||||
return "" if return_as_string else []
|
||||
|
||||
devices = []
|
||||
current_device = None
|
||||
|
||||
for line in command.output.split('\n'):
|
||||
if line.startswith('Device'):
|
||||
if current_device is not None:
|
||||
devices.append({
|
||||
"address": current_device,
|
||||
"name": current_name,
|
||||
"connected": current_connected
|
||||
})
|
||||
current_device = line.split()[1]
|
||||
current_name = None
|
||||
current_connected = None
|
||||
elif 'Name:' in line:
|
||||
current_name = line.split(': ', 1)[1]
|
||||
elif 'Connected:' in line:
|
||||
current_connected = line.split(': ')[1] == 'yes'
|
||||
|
||||
# Add the last device if any
|
||||
if current_device is not None:
|
||||
devices.append({
|
||||
"address": current_device,
|
||||
"name": current_name,
|
||||
"connected": current_connected
|
||||
})
|
||||
|
||||
if connected_devices_only:
|
||||
devices = [device for device in devices if device["connected"] == True]
|
||||
|
||||
return devices
|
104
custom_components/easy_computer_manager/computer/ssh_client.py
Normal file
104
custom_components/easy_computer_manager/computer/ssh_client.py
Normal file
@ -0,0 +1,104 @@
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
|
||||
import paramiko
|
||||
|
||||
from custom_components.easy_computer_manager import LOGGER
|
||||
from custom_components.easy_computer_manager.computer import CommandOutput
|
||||
|
||||
|
||||
class SSHClient:
|
||||
def __init__(self, host: str, username: str, password: Optional[str] = None, port: int = 22):
|
||||
self.host = host
|
||||
self.username = username
|
||||
self._password = password
|
||||
self.port = port
|
||||
self._connection: Optional[paramiko.SSHClient] = None
|
||||
|
||||
async def __aenter__(self):
|
||||
await self.connect()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_value, traceback):
|
||||
self.disconnect()
|
||||
|
||||
async def connect(self, retried: bool = False, computer: Optional['Computer'] = None) -> None:
|
||||
"""Open an SSH connection using Paramiko asynchronously."""
|
||||
if self.is_connection_alive():
|
||||
LOGGER.debug(f"Connection to {self.host} is already active.")
|
||||
return
|
||||
|
||||
self.disconnect() # Ensure any previous connection is closed
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
client = paramiko.SSHClient()
|
||||
|
||||
# Set missing host key policy to automatically accept unknown host keys
|
||||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
|
||||
try:
|
||||
# Offload the blocking connect call to a thread
|
||||
await loop.run_in_executor(None, self._blocking_connect, client)
|
||||
self._connection = client
|
||||
LOGGER.debug(f"Connected to {self.host}")
|
||||
|
||||
except (OSError, paramiko.SSHException) as exc:
|
||||
LOGGER.debug(f"Failed to connect to {self.host}: {exc}")
|
||||
if not retried:
|
||||
LOGGER.debug(f"Retrying connection to {self.host}...")
|
||||
await self.connect(retried=True) # Retry only once
|
||||
|
||||
finally:
|
||||
if computer is not None and hasattr(computer, "initialized"):
|
||||
computer.initialized = True
|
||||
|
||||
def disconnect(self) -> None:
|
||||
"""Close the SSH connection."""
|
||||
if self._connection:
|
||||
self._connection.close()
|
||||
LOGGER.debug(f"Disconnected from {self.host}")
|
||||
self._connection = None
|
||||
|
||||
def _blocking_connect(self, client: paramiko.SSHClient):
|
||||
"""Perform the blocking SSH connection using Paramiko."""
|
||||
client.connect(
|
||||
hostname=self.host,
|
||||
username=self.username,
|
||||
password=self._password,
|
||||
port=self.port,
|
||||
look_for_keys=False, # Set this to True if using private keys
|
||||
allow_agent=False
|
||||
)
|
||||
|
||||
async def execute_command(self, command: str) -> CommandOutput:
|
||||
"""Execute a command on the SSH server asynchronously."""
|
||||
if not self.is_connection_alive():
|
||||
LOGGER.debug(f"Connection to {self.host} is not alive. Reconnecting...")
|
||||
await self.connect()
|
||||
|
||||
try:
|
||||
# Offload command execution to avoid blocking
|
||||
loop = asyncio.get_running_loop()
|
||||
stdin, stdout, stderr = await loop.run_in_executor(None, self._connection.exec_command, command)
|
||||
|
||||
exit_status = stdout.channel.recv_exit_status()
|
||||
return CommandOutput(command, exit_status, stdout.read().decode(), stderr.read().decode())
|
||||
|
||||
except (paramiko.SSHException, EOFError) as exc:
|
||||
LOGGER.error(f"Failed to execute command on {self.host}: {exc}")
|
||||
return CommandOutput(command, -1, "", "")
|
||||
|
||||
def is_connection_alive(self) -> bool:
|
||||
"""Check if the SSH connection is still alive."""
|
||||
if self._connection is None:
|
||||
return False
|
||||
|
||||
try:
|
||||
transport = self._connection.get_transport()
|
||||
transport.send_ignore()
|
||||
|
||||
self._connection.exec_command('ls', timeout=1)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
return False
|
40
custom_components/easy_computer_manager/computer/utils.py
Normal file
40
custom_components/easy_computer_manager/computer/utils.py
Normal file
@ -0,0 +1,40 @@
|
||||
async def format_debug_information(computer: 'Computer'): # importing Computer causes circular import (how to fix?)
|
||||
"""Return debug information about the host system."""
|
||||
|
||||
data = {
|
||||
'os': {
|
||||
'name': computer.operating_system,
|
||||
'version': computer.operating_system_version,
|
||||
'desktop_environment': computer.desktop_environment
|
||||
},
|
||||
'connection': {
|
||||
'host': computer.host,
|
||||
'mac': computer.mac,
|
||||
'username': computer.username,
|
||||
'port': computer.port,
|
||||
'dualboot': computer.dualboot,
|
||||
'is_on': await computer.is_on(),
|
||||
'is_connected': computer._connection.is_connection_alive()
|
||||
},
|
||||
'grub': {
|
||||
'windows_entry': computer.windows_entry_grub
|
||||
},
|
||||
'audio': {
|
||||
'speakers': computer.audio_config.get('speakers'),
|
||||
'microphones': computer.audio_config.get('microphones')
|
||||
},
|
||||
'monitors': computer.monitors_config,
|
||||
'bluetooth_devices': computer.bluetooth_devices
|
||||
}
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def get_bluetooth_devices_as_str(computer: 'Computer') -> str:
|
||||
"""Return the bluetooth devices as a string."""
|
||||
devices = computer.bluetooth_devices
|
||||
|
||||
if not devices:
|
||||
return ""
|
||||
|
||||
return "; ".join([f"{device['name']} ({device['address']})" for device in devices])
|
@ -7,9 +7,8 @@ from typing import Any
|
||||
import voluptuous as vol
|
||||
from homeassistant import config_entries, exceptions
|
||||
from homeassistant.core import HomeAssistant
|
||||
from paramiko.ssh_exception import AuthenticationException
|
||||
|
||||
from . import utils
|
||||
from .computer import Computer
|
||||
from .const import DOMAIN
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -40,6 +39,8 @@ class Hub:
|
||||
self._name = host
|
||||
self._id = host.lower()
|
||||
|
||||
self.computer = Computer(host, "", username, password, port)
|
||||
|
||||
@property
|
||||
def hub_id(self) -> str:
|
||||
"""ID for dummy."""
|
||||
@ -48,9 +49,11 @@ class Hub:
|
||||
async def test_connection(self) -> bool:
|
||||
"""Test connectivity to the computer is OK."""
|
||||
try:
|
||||
return utils.test_connection(
|
||||
utils.create_ssh_connection(self._host, self._username, self._password, self._port))
|
||||
except AuthenticationException:
|
||||
# TODO: check if reachable
|
||||
_LOGGER.info("Testing connection to %s", self._host)
|
||||
return True
|
||||
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
@ -63,6 +66,7 @@ async def validate_input(hass: HomeAssistant, data: dict) -> dict[str, Any]:
|
||||
|
||||
hub = Hub(hass, data["host"], data["username"], data["password"], data["port"])
|
||||
|
||||
_LOGGER.info("Validating configuration")
|
||||
if not await hub.test_connection():
|
||||
raise CannotConnect
|
||||
|
||||
@ -81,7 +85,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
try:
|
||||
info = await validate_input(self.hass, user_input)
|
||||
return self.async_create_entry(title=info["title"], data=user_input)
|
||||
except (AuthenticationException, CannotConnect, InvalidHost) as ex:
|
||||
except (CannotConnect, InvalidHost) as ex:
|
||||
errors["base"] = str(ex)
|
||||
except Exception as ex: # pylint: disable=broad-except
|
||||
_LOGGER.exception("Unexpected exception: %s", ex)
|
||||
|
@ -11,3 +11,87 @@ SERVICE_CHANGE_MONITORS_CONFIG = "change_monitors_config"
|
||||
SERVICE_STEAM_BIG_PICTURE = "steam_big_picture"
|
||||
SERVICE_CHANGE_AUDIO_CONFIG = "change_audio_config"
|
||||
SERVICE_DEBUG_INFO = "debug_info"
|
||||
|
||||
|
||||
ACTIONS = {
|
||||
"operating_system": {
|
||||
"linux": ["uname"]
|
||||
},
|
||||
"operating_system_version": {
|
||||
"windows": ['for /f "tokens=1 delims=|" %i in (\'wmic os get Name ^| findstr /B /C:"Microsoft"\') do @echo %i'],
|
||||
"linux": ["awk -F'=' '/^NAME=|^VERSION=/{gsub(/\"/, \"\", $2); printf $2\" \"}\' /etc/os-release && echo", "lsb_release -a | awk '/Description/ {print $2, $3, $4}'"]
|
||||
},
|
||||
"desktop_environment": {
|
||||
"linux": ["for session in $(ls /usr/bin/*session 2>/dev/null); do basename $session | sed 's/-session//'; done | grep -E 'gnome|kde|xfce|mate|lxde|cinnamon|budgie|unity' | head -n 1"],
|
||||
"windows": ["echo Windows"]
|
||||
},
|
||||
"shutdown": {
|
||||
"windows": ["shutdown /s /t 0", "wmic os where Primary=TRUE call Shutdown"],
|
||||
"linux": ["sudo /sbin/shutdown -h now", "sudo /sbin/init 0", "sudo /usr/bin/systemctl poweroff"]
|
||||
},
|
||||
"restart": {
|
||||
"windows": ["shutdown /r /t 0", "wmic os where Primary=TRUE call Reboot"],
|
||||
"linux": ["sudo /sbin/shutdown -r now", "sudo /sbin/init 6", "sudo /usr/bin/systemctl reboot"]
|
||||
},
|
||||
"sleep": {
|
||||
"windows": ["shutdown /h /t 0", "rundll32.exe powrprof.dll,SetSuspendState Sleep"],
|
||||
"linux": ["sudo /usr/bin/systemctl suspend", "sudo /usr/sbin/pm-suspend"]
|
||||
},
|
||||
"get_windows_entry_grub": {
|
||||
"linux": ["sudo /usr/bin/cat /etc/grub2.cfg | awk -F \"'\" '/windows/ {print $2}'",
|
||||
"sudo /usr/bin/cat /etc/grub.cfg | awk -F \"'\" '/windows/ {print $2}'"]
|
||||
},
|
||||
"set_grub_entry": {
|
||||
"linux": {
|
||||
"commands": ["sudo /usr/sbin/grub-reboot %grub-entry%", "sudo /usr/sbin/grub2-reboot %grub-entry%"],
|
||||
"params": ["grub-entry"],
|
||||
}
|
||||
},
|
||||
"get_monitors_config": {
|
||||
"linux": ["gnome-monitor-config list"]
|
||||
},
|
||||
"set_monitors_config": {
|
||||
"linux": {
|
||||
"gnome": {
|
||||
"command": "gnome-monitor-config set %args%",
|
||||
"params": ["args"]
|
||||
}
|
||||
}
|
||||
},
|
||||
"get_speakers": {
|
||||
"linux": ["LANG=en_US.UTF-8 pactl list sinks"]
|
||||
},
|
||||
"get_microphones": {
|
||||
"linux": ["LANG=en_US.UTF-8 pactl list sources"]
|
||||
},
|
||||
"set_audio_config": {
|
||||
"linux": {
|
||||
"command": "LANG=en_US.UTF-8 pactl %args%",
|
||||
"params": ["args"]
|
||||
}
|
||||
},
|
||||
"get_bluetooth_devices": {
|
||||
"linux": {
|
||||
"command": "bluetoothctl info",
|
||||
"raise_on_error": False,
|
||||
}
|
||||
},
|
||||
"install_nirmcd": {
|
||||
"windows": {
|
||||
"command": "powershell -Command \"Invoke-WebRequest -Uri %download_url% -OutFile %install_path%\\nircmd.zip -UseBasicParsing; Expand-Archive %install_path%\\nircmd.zip -DestinationPath %install_path%; Remove-Item %install_path%\\nircmd.zip\"",
|
||||
"params": ["download_url", "install_path"]
|
||||
}
|
||||
},
|
||||
"start_steam_big_picture": {
|
||||
"linux": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -bigpicture &",
|
||||
"windows": "start steam://open/bigpicture"
|
||||
},
|
||||
"stop_steam_big_picture": {
|
||||
"linux": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -shutdown &",
|
||||
"windows": "C:\\Program Files (x86)\\Steam\\steam.exe -shutdown"
|
||||
},
|
||||
"exit_steam_big_picture": {
|
||||
"linux": "", # TODO: find a way to exit steam big picture
|
||||
"windows": "nircmd win close title \"Steam Big Picture Mode\""
|
||||
},
|
||||
}
|
@ -2,8 +2,7 @@
|
||||
"domain": "easy_computer_manager",
|
||||
"name": "Easy Computer Manager",
|
||||
"codeowners": [
|
||||
"@M4TH1EU",
|
||||
"@ntilley905"
|
||||
"@M4TH1EU"
|
||||
],
|
||||
"config_flow": true,
|
||||
"dependencies": [],
|
||||
@ -11,10 +10,8 @@
|
||||
"iot_class": "local_polling",
|
||||
"issue_tracker": "https://github.com/M4TH1EU/HA-EasyComputerManager/issues",
|
||||
"requirements": [
|
||||
"construct==2.10.70",
|
||||
"wakeonlan==3.1.0",
|
||||
"fabric2==3.2.2",
|
||||
"paramiko==3.4.0"
|
||||
"asyncssh==2.16.0"
|
||||
],
|
||||
"version": "1.3.0"
|
||||
"version": "2.0.0"
|
||||
}
|
||||
|
@ -1,80 +1,53 @@
|
||||
# Some snippets of code are from the official wake_on_lan integration (inspiration for this custom component)
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import subprocess as sp
|
||||
from typing import Any
|
||||
from typing import Any, Dict
|
||||
|
||||
import voluptuous as vol
|
||||
import wakeonlan
|
||||
from homeassistant.components.switch import (SwitchEntity)
|
||||
from homeassistant.components.switch import SwitchEntity
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import (
|
||||
CONF_HOST,
|
||||
CONF_MAC,
|
||||
CONF_NAME,
|
||||
CONF_PASSWORD,
|
||||
CONF_PORT,
|
||||
CONF_USERNAME, )
|
||||
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers import (
|
||||
device_registry as dr,
|
||||
entity_platform,
|
||||
CONF_HOST, CONF_MAC, CONF_NAME, CONF_PASSWORD, CONF_PORT, CONF_USERNAME,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse
|
||||
from homeassistant.helpers import entity_platform, device_registry as dr
|
||||
from homeassistant.helpers.config_validation import make_entity_service_schema
|
||||
from homeassistant.helpers.device_registry import DeviceInfo
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from paramiko.ssh_exception import AuthenticationException
|
||||
|
||||
from . import utils
|
||||
from .const import SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP, \
|
||||
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \
|
||||
SERVICE_CHANGE_MONITORS_CONFIG, SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO, DOMAIN
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
from .computer import OSType, Computer
|
||||
from .computer.utils import format_debug_information, get_bluetooth_devices_as_str
|
||||
from .const import (
|
||||
DOMAIN, SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP,
|
||||
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER,
|
||||
SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, SERVICE_CHANGE_MONITORS_CONFIG,
|
||||
SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO
|
||||
)
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant,
|
||||
config: ConfigEntry,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
async_add_entities: AddEntitiesCallback
|
||||
) -> None:
|
||||
# Retrieve the data from the config flow
|
||||
mac_address: str = config.data.get(CONF_MAC)
|
||||
# broadcast_address: str | None = config.data.get(CONF_BROADCAST_ADDRESS)
|
||||
# broadcast_port: int | None = config.data.get(CONF_BROADCAST_PORT)
|
||||
host: str = config.data.get(CONF_HOST)
|
||||
name: str = config.data.get(CONF_NAME)
|
||||
dualboot: bool = config.data.get("dualboot")
|
||||
username: str = config.data.get(CONF_USERNAME)
|
||||
password: str = config.data.get(CONF_PASSWORD)
|
||||
port: int | None = config.data.get(CONF_PORT)
|
||||
"""Set up the computer switch from a config entry."""
|
||||
mac_address = config.data[CONF_MAC]
|
||||
host = config.data[CONF_HOST]
|
||||
name = config.data[CONF_NAME]
|
||||
dualboot = config.data.get("dualboot", False)
|
||||
username = config.data[CONF_USERNAME]
|
||||
password = config.data[CONF_PASSWORD]
|
||||
port = config.data.get(CONF_PORT)
|
||||
|
||||
# Register the computer switch
|
||||
async_add_entities(
|
||||
[
|
||||
ComputerSwitch(
|
||||
hass,
|
||||
name,
|
||||
host,
|
||||
mac_address,
|
||||
dualboot,
|
||||
username,
|
||||
password,
|
||||
port,
|
||||
),
|
||||
],
|
||||
host is not None,
|
||||
[ComputerSwitch(hass, name, host, mac_address, dualboot, username, password, port)],
|
||||
True
|
||||
)
|
||||
|
||||
platform = entity_platform.async_get_current_platform()
|
||||
|
||||
# Synthax : (service_name: str, schema: dict, supports_response: SupportsResponse)
|
||||
# Service registrations
|
||||
services = [
|
||||
(SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE),
|
||||
(SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE),
|
||||
(SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, {}, SupportsResponse.NONE),
|
||||
(SERVICE_PUT_COMPUTER_TO_SLEEP, {}, SupportsResponse.NONE),
|
||||
@ -91,18 +64,18 @@ async def async_setup_entry(
|
||||
(SERVICE_DEBUG_INFO, {}, SupportsResponse.ONLY),
|
||||
]
|
||||
|
||||
# Register the services that depends on the switch
|
||||
for service in services:
|
||||
# Register services with their schemas
|
||||
for service_name, schema, supports_response in services:
|
||||
platform.async_register_entity_service(
|
||||
service[0],
|
||||
make_entity_service_schema(service[1]),
|
||||
service[0],
|
||||
supports_response=service[2]
|
||||
service_name,
|
||||
make_entity_service_schema(schema),
|
||||
service_name,
|
||||
supports_response=supports_response
|
||||
)
|
||||
|
||||
|
||||
class ComputerSwitch(SwitchEntity):
|
||||
"""Representation of a computer switch."""
|
||||
"""Representation of a computer switch entity."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@ -110,216 +83,121 @@ class ComputerSwitch(SwitchEntity):
|
||||
name: str,
|
||||
host: str | None,
|
||||
mac_address: str,
|
||||
# broadcast_address: str | None,
|
||||
# broadcast_port: int | None,
|
||||
dualboot: bool | False,
|
||||
dualboot: bool,
|
||||
username: str,
|
||||
password: str,
|
||||
port: int | None,
|
||||
) -> None:
|
||||
"""Initialize the WOL switch."""
|
||||
|
||||
self._hass = hass
|
||||
"""Initialize the computer switch entity."""
|
||||
self.hass = hass
|
||||
self._attr_name = name
|
||||
self._host = host
|
||||
self._mac_address = mac_address
|
||||
# self._broadcast_address = broadcast_address
|
||||
# self._broadcast_port = broadcast_port
|
||||
self._dualboot = dualboot
|
||||
self._username = username
|
||||
self._password = password
|
||||
self._port = port
|
||||
self._state = False
|
||||
self._attr_assumed_state = host is None
|
||||
self._attr_should_poll = bool(not self._attr_assumed_state)
|
||||
self._attr_unique_id = dr.format_mac(mac_address)
|
||||
self._state = False
|
||||
self._attr_should_poll = not self._attr_assumed_state
|
||||
self._attr_extra_state_attributes = {}
|
||||
self._connection = utils.create_ssh_connection(self._host, self._username, self._password)
|
||||
|
||||
self.computer = Computer(host, mac_address, username, password, port, dualboot)
|
||||
|
||||
@property
|
||||
def device_info(self) -> DeviceInfo | None:
|
||||
"""Return the device information."""
|
||||
if self._host is None:
|
||||
return None
|
||||
|
||||
def device_info(self) -> DeviceInfo:
|
||||
"""Return device info for the registry."""
|
||||
return DeviceInfo(
|
||||
identifiers={(DOMAIN, self._mac_address)},
|
||||
identifiers={(DOMAIN, self.computer.mac)},
|
||||
name=self._attr_name,
|
||||
manufacturer="Generic",
|
||||
model="Computer",
|
||||
sw_version=utils.get_operating_system_version(
|
||||
self._connection) if self._state else "Unknown",
|
||||
connections={(dr.CONNECTION_NETWORK_MAC, self._mac_address)},
|
||||
sw_version=self.computer.operating_system_version,
|
||||
connections={(dr.CONNECTION_NETWORK_MAC, self.computer.mac)},
|
||||
)
|
||||
|
||||
@property
|
||||
def icon(self) -> str:
|
||||
"""Return the icon to use in the frontend, if any."""
|
||||
return "mdi:monitor" if self._state else "mdi:monitor-off"
|
||||
|
||||
@property
|
||||
def is_on(self) -> bool:
|
||||
"""Return true if the computer switch is on."""
|
||||
"""Return true if the computer is on."""
|
||||
return self._state
|
||||
|
||||
def turn_on(self, **kwargs: Any) -> None:
|
||||
"""Turn the computer on using wake on lan."""
|
||||
service_kwargs: dict[str, Any] = {}
|
||||
# if self._broadcast_address is not None:
|
||||
# service_kwargs["ip_address"] = self._broadcast_address
|
||||
# if self._broadcast_port is not None:
|
||||
# service_kwargs["port"] = self._broadcast_port
|
||||
|
||||
_LOGGER.debug(
|
||||
"Send magic packet to mac %s (broadcast: %s, port: %s)",
|
||||
self._mac_address,
|
||||
# self._broadcast_address,
|
||||
# self._broadcast_port,
|
||||
)
|
||||
|
||||
wakeonlan.send_magic_packet(self._mac_address, **service_kwargs)
|
||||
async def async_turn_on(self, **kwargs: Any) -> None:
|
||||
"""Turn the computer on using Wake-on-LAN."""
|
||||
await self.computer.start()
|
||||
|
||||
if self._attr_assumed_state:
|
||||
self._state = True
|
||||
self.async_write_ha_state()
|
||||
|
||||
def turn_off(self, **kwargs: Any) -> None:
|
||||
"""Turn the computer off using appropriate shutdown command based on running OS and/or distro."""
|
||||
utils.shutdown_system(self._connection)
|
||||
async def async_turn_off(self, **kwargs: Any) -> None:
|
||||
"""Turn the computer off via shutdown command."""
|
||||
await self.computer.shutdown()
|
||||
|
||||
if self._attr_assumed_state:
|
||||
self._state = False
|
||||
self.async_write_ha_state()
|
||||
|
||||
def restart_to_windows_from_linux(self) -> None:
|
||||
"""Restart the computer to Windows from a running Linux by setting grub-reboot and restarting."""
|
||||
async def async_update(self) -> None:
|
||||
"""Update the state by checking if the computer is on."""
|
||||
is_on = await self.computer.is_on()
|
||||
if self.is_on != is_on:
|
||||
self._state = is_on
|
||||
# self.async_write_ha_state()
|
||||
|
||||
if self._dualboot:
|
||||
utils.restart_to_windows_from_linux(self._connection)
|
||||
else:
|
||||
_LOGGER.error(
|
||||
"The computer with the IP address %s is not running a dualboot system or hasn't been configured "
|
||||
"correctly in the UI.",
|
||||
self._host)
|
||||
|
||||
def restart_to_linux_from_windows(self) -> None:
|
||||
"""Restart the computer to Linux from a running Windows by setting grub-reboot and restarting."""
|
||||
|
||||
if self._dualboot:
|
||||
# TODO: check for default grub entry and adapt accordingly
|
||||
utils.restart_system(self._connection)
|
||||
else:
|
||||
_LOGGER.error(
|
||||
"The computer with the IP address %s is not running a dualboot system or hasn't been configured "
|
||||
"correctly in the UI.",
|
||||
self._host)
|
||||
|
||||
def put_computer_to_sleep(self) -> None:
|
||||
"""Put the computer to sleep using appropriate sleep command based on running OS and/or distro."""
|
||||
utils.sleep_system(self._connection)
|
||||
|
||||
def start_computer_to_windows(self) -> None:
|
||||
"""Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart."""
|
||||
self.turn_on()
|
||||
|
||||
if self._dualboot:
|
||||
# Wait for the computer to boot using a dedicated thread to avoid blocking the main thread
|
||||
self._hass.loop.create_task(self.service_restart_to_windows_from_linux())
|
||||
|
||||
else:
|
||||
_LOGGER.error(
|
||||
"The computer with the IP address %s is not running a dualboot system or hasn't been configured "
|
||||
"correctly in the UI.",
|
||||
self._host)
|
||||
|
||||
async def service_restart_to_windows_from_linux(self) -> None:
|
||||
"""Method to be run in a separate thread to wait for the computer to boot and then reboot to Windows."""
|
||||
while not self.is_on:
|
||||
await asyncio.sleep(3)
|
||||
|
||||
await utils.restart_to_windows_from_linux(self._connection)
|
||||
|
||||
def restart_computer(self) -> None:
|
||||
"""Restart the computer using appropriate restart command based on running OS and/or distro."""
|
||||
|
||||
# TODO: check for default grub entry and adapt accordingly
|
||||
if self._dualboot and not utils.is_unix_system(connection=self._connection):
|
||||
utils.restart_system(self._connection)
|
||||
|
||||
# Wait for the computer to boot using a dedicated thread to avoid blocking the main thread
|
||||
self.restart_to_windows_from_linux()
|
||||
else:
|
||||
utils.restart_system(self._connection)
|
||||
|
||||
def change_monitors_config(self, monitors_config: dict | None = None) -> None:
|
||||
"""Change the monitors configuration using a YAML config file."""
|
||||
if monitors_config is not None and len(monitors_config) > 0:
|
||||
utils.change_monitors_config(self._connection, monitors_config)
|
||||
else:
|
||||
raise HomeAssistantError("The 'monitors_config' parameter must be a non-empty dictionary.")
|
||||
|
||||
def steam_big_picture(self, action: str) -> None:
|
||||
"""Controls Steam Big Picture mode."""
|
||||
|
||||
if action is not None:
|
||||
utils.steam_big_picture(self._connection, action)
|
||||
else:
|
||||
raise HomeAssistantError("The 'action' parameter must be specified.")
|
||||
|
||||
def change_audio_config(self, volume: int | None = None, mute: bool | None = None, input_device: str | None = None,
|
||||
output_device: str | None = None) -> None:
|
||||
"""Change the audio configuration using a YAML config file."""
|
||||
utils.change_audio_config(self._connection, volume, mute, input_device, output_device)
|
||||
|
||||
def update(self) -> None:
|
||||
"""Ping the computer to see if it is online and update the state."""
|
||||
timeout = 1
|
||||
ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self._host)]
|
||||
status = sp.call(ping_cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
|
||||
self._state = not bool(status)
|
||||
|
||||
# Update the state attributes and the connection only if the computer is on
|
||||
if self._state:
|
||||
if self._connection is None or not utils.test_connection(self._connection):
|
||||
self.renew_ssh_connection()
|
||||
|
||||
if not self._state:
|
||||
return
|
||||
# If the computer is on, update its attributes
|
||||
if is_on:
|
||||
await self.computer.update(is_on)
|
||||
|
||||
self._attr_extra_state_attributes = {
|
||||
"operating_system": utils.get_operating_system(self._connection),
|
||||
"operating_system_version": utils.get_operating_system_version(self._connection),
|
||||
"mac_address": self._mac_address,
|
||||
"ip_address": self._host,
|
||||
"connected_devices": utils.get_bluetooth_devices(self._connection, only_connected=True, return_as_string=True),
|
||||
"operating_system": self.computer.operating_system,
|
||||
"operating_system_version": self.computer.operating_system_version,
|
||||
"mac_address": self.computer.mac,
|
||||
"ip_address": self.computer.host,
|
||||
"connected_devices": get_bluetooth_devices_as_str(self.computer),
|
||||
}
|
||||
|
||||
def renew_ssh_connection(self) -> None:
|
||||
"""Renew the SSH connection."""
|
||||
_LOGGER.info("Renewing SSH connection to %s using username %s", self._host, self._username)
|
||||
# Service methods for various functionalities
|
||||
async def restart_to_windows_from_linux(self) -> None:
|
||||
"""Restart the computer from Linux to Windows."""
|
||||
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
|
||||
|
||||
if self._connection is not None:
|
||||
self._connection.close()
|
||||
async def restart_to_linux_from_windows(self) -> None:
|
||||
"""Restart the computer from Windows to Linux."""
|
||||
await self.computer.restart(OSType.WINDOWS, OSType.LINUX)
|
||||
|
||||
try:
|
||||
self._connection = utils.create_ssh_connection(self._host, self._username, self._password)
|
||||
self._connection.open()
|
||||
except AuthenticationException as error:
|
||||
_LOGGER.error("Could not authenticate to %s using username %s: %s", self._host, self._username, error)
|
||||
self._state = False
|
||||
except Exception as error:
|
||||
_LOGGER.error("Could not connect to %s using username %s: %s", self._host, self._username, error)
|
||||
async def put_computer_to_sleep(self) -> None:
|
||||
"""Put the computer to sleep."""
|
||||
await self.computer.put_to_sleep()
|
||||
|
||||
# Check if the error is due to timeout
|
||||
if "timed out" in str(error):
|
||||
_LOGGER.warning(
|
||||
"Computer at %s does not respond to the SSH request. Possible causes: might be offline, "
|
||||
"the firewall is blocking the SSH port, or the SSH server is offline and/or misconfigured.",
|
||||
self._host
|
||||
)
|
||||
async def start_computer_to_windows(self) -> None:
|
||||
"""Start the computer to Windows after booting into Linux first."""
|
||||
await self.computer.start()
|
||||
|
||||
self._state = False
|
||||
async def wait_and_reboot() -> None:
|
||||
"""Wait until the computer is on, then restart to Windows."""
|
||||
while not await self.computer.is_on():
|
||||
await asyncio.sleep(3)
|
||||
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
|
||||
|
||||
def debug_info(self) -> ServiceResponse:
|
||||
"""Prints debug info."""
|
||||
return utils.get_debug_info(self._connection)
|
||||
self.hass.loop.create_task(wait_and_reboot())
|
||||
|
||||
async def restart_computer(self) -> None:
|
||||
"""Restart the computer."""
|
||||
await self.computer.restart()
|
||||
|
||||
async def change_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
|
||||
"""Change the monitor configuration."""
|
||||
await self.computer.set_monitors_config(monitors_config)
|
||||
|
||||
async def steam_big_picture(self, action: str) -> None:
|
||||
"""Control Steam Big Picture mode."""
|
||||
await self.computer.steam_big_picture(action)
|
||||
|
||||
async def change_audio_config(
|
||||
self, volume: int | None = None, mute: bool | None = None,
|
||||
input_device: str | None = None, output_device: str | None = None
|
||||
) -> None:
|
||||
"""Change the audio configuration."""
|
||||
await self.computer.set_audio_config(volume, mute, input_device, output_device)
|
||||
|
||||
async def debug_info(self) -> ServiceResponse:
|
||||
"""Return debug information."""
|
||||
return await format_debug_information(self.computer)
|
||||
|
@ -1,536 +0,0 @@
|
||||
import logging
|
||||
import re
|
||||
|
||||
import fabric2
|
||||
from fabric2 import Connection
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# _LOGGER.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
def create_ssh_connection(host: str, username: str, password: str, port=22):
|
||||
"""Create an SSH connection to a host using a username and password specified in the config flow."""
|
||||
conf = fabric2.Config()
|
||||
conf.run.hide = True
|
||||
conf.run.warn = True
|
||||
conf.warn = True
|
||||
conf.sudo.password = password
|
||||
conf.password = password
|
||||
|
||||
connection = Connection(
|
||||
host=host, user=username, port=port, connect_timeout=3, connect_kwargs={"password": password},
|
||||
config=conf
|
||||
)
|
||||
|
||||
_LOGGER.info("Successfully created SSH connection to %s using username %s", host, username)
|
||||
|
||||
return connection
|
||||
|
||||
|
||||
def test_connection(connection: Connection):
|
||||
"""Test the connection to the host by running a simple command."""
|
||||
try:
|
||||
connection.run('ls')
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def is_unix_system(connection: Connection):
|
||||
"""Return a boolean based on get_operating_system result."""
|
||||
return get_operating_system(connection) == "Linux/Unix"
|
||||
|
||||
|
||||
def get_operating_system_version(connection: Connection, is_unix=None):
|
||||
"""Return the running operating system name and version."""
|
||||
|
||||
if is_unix is None:
|
||||
is_unix = is_unix_system(connection)
|
||||
|
||||
if is_unix:
|
||||
result = connection.run(
|
||||
"awk -F'=' '/^NAME=|^VERSION=/{gsub(/\"/, \"\", $2); printf $2\" \"}\' /etc/os-release && echo").stdout
|
||||
if result == "":
|
||||
result = connection.run("lsb_release -a | awk '/Description/ {print $2, $3, $4}'").stdout
|
||||
|
||||
return result
|
||||
else:
|
||||
return connection.run(
|
||||
'for /f "tokens=1 delims=|" %i in (\'wmic os get Name ^| findstr /B /C:"Microsoft"\') do @echo %i').stdout
|
||||
|
||||
|
||||
def get_operating_system(connection: Connection):
|
||||
"""Return the running operating system type."""
|
||||
# TODO: might be a better way to do this
|
||||
result = connection.run("uname")
|
||||
if result.return_code == 0:
|
||||
return "Linux/Unix"
|
||||
else:
|
||||
return "Windows/Other"
|
||||
|
||||
|
||||
def shutdown_system(connection: Connection, is_unix=None):
|
||||
"""Shutdown the system."""
|
||||
|
||||
if is_unix is None:
|
||||
is_unix = is_unix_system(connection)
|
||||
|
||||
shutdown_commands = {
|
||||
"unix": ["sudo shutdown -h now", "sudo init 0", "sudo systemctl poweroff"],
|
||||
"windows": ["shutdown /s /t 0", "wmic os where Primary=TRUE call Shutdown"]
|
||||
}
|
||||
|
||||
for command in shutdown_commands["unix" if is_unix else "windows"]:
|
||||
result = connection.run(command)
|
||||
if result.return_code == 0:
|
||||
_LOGGER.debug("System shutting down on %s.", connection.host)
|
||||
connection.close()
|
||||
return
|
||||
|
||||
raise HomeAssistantError(f"Cannot shutdown system running at {connection.host}, all methods failed.")
|
||||
|
||||
|
||||
def restart_system(connection: Connection, is_unix=None):
|
||||
"""Restart the system."""
|
||||
|
||||
if is_unix is None:
|
||||
is_unix = is_unix_system(connection)
|
||||
|
||||
restart_commands = {
|
||||
"unix": ["sudo shutdown -r now", "sudo init 6", "sudo systemctl reboot"],
|
||||
"windows": ["shutdown /r /t 0", "wmic os where Primary=TRUE call Reboot"]
|
||||
}
|
||||
|
||||
for command in restart_commands["unix" if is_unix else "windows"]:
|
||||
result = connection.run(command)
|
||||
if result.return_code == 0:
|
||||
_LOGGER.debug("System restarting on %s.", connection.host)
|
||||
return
|
||||
|
||||
raise HomeAssistantError(f"Cannot restart system running at {connection.host}, all methods failed.")
|
||||
|
||||
|
||||
def sleep_system(connection: Connection, is_unix=None):
|
||||
"""Put the system to sleep."""
|
||||
|
||||
if is_unix is None:
|
||||
is_unix = is_unix_system(connection)
|
||||
|
||||
sleep_commands = {
|
||||
"unix": ["sudo systemctl suspend", "sudo pm-suspend"],
|
||||
"windows": ["shutdown /h /t 0", "rundll32.exe powrprof.dll,SetSuspendState Sleep"]
|
||||
}
|
||||
|
||||
for command in sleep_commands["unix" if is_unix else "windows"]:
|
||||
result = connection.run(command)
|
||||
if result.return_code == 0:
|
||||
_LOGGER.debug("System sleeping on %s.", connection.host)
|
||||
return
|
||||
|
||||
raise HomeAssistantError(f"Cannot put system running at {connection.host} to sleep, all methods failed.")
|
||||
|
||||
|
||||
def get_windows_entry_in_grub(connection: Connection):
|
||||
"""
|
||||
Grabs the Windows entry name in GRUB.
|
||||
Used later with grub-reboot to specify which entry to boot.
|
||||
"""
|
||||
commands = [
|
||||
"sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub/grub.cfg",
|
||||
"sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub2/grub.cfg"
|
||||
]
|
||||
|
||||
for command in commands:
|
||||
result = connection.run(command)
|
||||
if result.return_code == 0 and result.stdout.strip():
|
||||
_LOGGER.debug("Found Windows entry in GRUB: " + result.stdout.strip())
|
||||
return result.stdout.strip()
|
||||
|
||||
_LOGGER.error("Could not find Windows entry in GRUB for system running at %s.", connection.host)
|
||||
return None
|
||||
|
||||
|
||||
def restart_to_windows_from_linux(connection: Connection):
|
||||
"""Restart a running Linux system to Windows."""
|
||||
|
||||
if not is_unix_system(connection):
|
||||
raise HomeAssistantError(f"System running at {connection.host} is not a Linux system.")
|
||||
|
||||
windows_entry = get_windows_entry_in_grub(connection)
|
||||
|
||||
if windows_entry is not None:
|
||||
reboot_commands = ["sudo grub-reboot", "sudo grub2-reboot"]
|
||||
|
||||
for reboot_command in reboot_commands:
|
||||
result = connection.run(f"{reboot_command} \"{windows_entry}\"")
|
||||
|
||||
if result.return_code == 0:
|
||||
_LOGGER.debug("Rebooting to Windows")
|
||||
restart_system(connection)
|
||||
return
|
||||
|
||||
raise HomeAssistantError(f"Failed to restart system running on {connection.host} to Windows from Linux.")
|
||||
else:
|
||||
raise HomeAssistantError(f"Could not find Windows entry in grub for system running at {connection.host}.")
|
||||
|
||||
|
||||
def change_monitors_config(connection: Connection, monitors_config: dict):
|
||||
"""Change monitors configuration on the host (Linux + Gnome, and partial Windows support)."""
|
||||
|
||||
if is_unix_system(connection):
|
||||
command_parts = ["gnome-monitor-config", "set"]
|
||||
|
||||
for monitor, settings in monitors_config.items():
|
||||
if settings.get('enabled', False):
|
||||
command_parts.extend(['-LpM' if settings.get('primary', False) else '-LM', monitor])
|
||||
|
||||
if 'position' in settings:
|
||||
command_parts.extend(['-x', str(settings["position"][0]), '-y', str(settings["position"][1])])
|
||||
|
||||
if 'mode' in settings:
|
||||
command_parts.extend(['-m', settings["mode"]])
|
||||
|
||||
if 'scale' in settings:
|
||||
command_parts.extend(['-s', str(settings["scale"])])
|
||||
|
||||
if 'transform' in settings:
|
||||
command_parts.extend(['-t', settings["transform"]])
|
||||
|
||||
command = ' '.join(command_parts)
|
||||
_LOGGER.debug("Running command: %s", command)
|
||||
|
||||
result = connection.run(command)
|
||||
|
||||
if result.return_code == 0:
|
||||
_LOGGER.info("Successfully changed monitors config on system running on %s.", connection.host)
|
||||
|
||||
# Run it once again, it fixes some strange Gnome display bug sometimes and it doesn't hurt
|
||||
connection.run(command)
|
||||
else:
|
||||
raise HomeAssistantError("Could not change monitors config on system running on %s, check logs with debug",
|
||||
connection.host)
|
||||
|
||||
else:
|
||||
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||
|
||||
# TODO: Implement Windows support using NIRCMD
|
||||
command_parts = ["nircmd.exe", "setdisplay"]
|
||||
# setdisplay {monitor:index/name} [width] [height] [color bits] {refresh rate} {-updatereg} {-allusers}
|
||||
|
||||
for monitor, settings in monitors_config.items():
|
||||
if settings.get('enabled', False):
|
||||
command_parts.extend(
|
||||
[f'{monitor} -primary' if settings.get('primary', False) else f'{monitor} -secondary'])
|
||||
|
||||
if 'resolution' in settings:
|
||||
command_parts.extend([str(settings["resolution"][0]), str(settings["resolution"][1])])
|
||||
|
||||
if 'refresh_rate' in settings:
|
||||
command_parts.extend(['-hz', str(settings["refresh_rate"])])
|
||||
|
||||
if 'color_bits' in settings:
|
||||
command_parts.extend(['-bits', str(settings["color_bits"])])
|
||||
|
||||
command = ' '.join(command_parts)
|
||||
_LOGGER.debug("Running command: %s", command)
|
||||
|
||||
result = connection.run(command)
|
||||
|
||||
if result.return_code == 0:
|
||||
_LOGGER.info("Successfully changed monitors config on system running on %s.", connection.host)
|
||||
else:
|
||||
raise HomeAssistantError("Could not change monitors config on system running on %s, check logs with debug",
|
||||
connection.host)
|
||||
|
||||
|
||||
def silent_install_nircmd(connection: Connection):
|
||||
"""Silently install NIRCMD on a Windows system."""
|
||||
|
||||
if not is_unix_system(connection):
|
||||
download_url = "https://www.nirsoft.net/utils/nircmd.zip"
|
||||
install_path = f"C:\\Users\\{connection.user}\\AppData\\Local\\EasyComputerManager"
|
||||
|
||||
# Download and unzip NIRCMD
|
||||
download_command = f"powershell -Command \"Invoke-WebRequest -Uri {download_url} -OutFile {install_path}\\nircmd.zip -UseBasicParsing\""
|
||||
unzip_command = f"powershell -Command \"Expand-Archive {install_path}\\nircmd.zip -DestinationPath {install_path}\""
|
||||
remove_zip_command = f"powershell -Command \"Remove-Item {install_path}\\nircmd.zip\""
|
||||
|
||||
commands = [download_command, unzip_command, remove_zip_command]
|
||||
|
||||
for command in commands:
|
||||
result = connection.run(command)
|
||||
if result.return_code != 0:
|
||||
_LOGGER.error("Could not install NIRCMD on system running on %s.", connection.host)
|
||||
return
|
||||
|
||||
|
||||
def get_monitors_config(connection: Connection) -> dict:
|
||||
"""Parse the output of the gnome-monitor-config command to get the current monitor configuration."""
|
||||
|
||||
if is_unix_system(connection):
|
||||
result = connection.run("gnome-monitor-config list")
|
||||
if result.return_code != 0:
|
||||
raise HomeAssistantError(f"Could not get monitors config on system running at {connection.host}.")
|
||||
|
||||
monitors = []
|
||||
current_monitor = None
|
||||
|
||||
for line in result.stdout.split('\n'):
|
||||
monitor_match = re.match(r'^Monitor \[ (.+?) \] (ON|OFF)$', line)
|
||||
if monitor_match:
|
||||
if current_monitor:
|
||||
monitors.append(current_monitor)
|
||||
source, status = monitor_match.groups()
|
||||
current_monitor = {'source': source, 'status': status, 'names': [], 'resolutions': []}
|
||||
elif current_monitor:
|
||||
display_name_match = re.match(r'^\s+display-name: (.+)$', line)
|
||||
resolution_match = re.match(r'^\s+(\d+x\d+@\d+(?:\.\d+)?).*$', line)
|
||||
if display_name_match:
|
||||
current_monitor['names'].append(display_name_match.group(1).replace('"', ''))
|
||||
elif resolution_match:
|
||||
# Don't include resolutions under 1280x720
|
||||
if int(resolution_match.group(1).split('@')[0].split('x')[0]) >= 1280:
|
||||
|
||||
# If there are already resolutions in the list, check if the framerate between the last is >1
|
||||
if len(current_monitor['resolutions']) > 0:
|
||||
last_resolution = current_monitor['resolutions'][-1]
|
||||
last_resolution_size = last_resolution.split('@')[0]
|
||||
this_resolution_size = resolution_match.group(1).split('@')[0]
|
||||
|
||||
# Only truncate some framerates if the resolution are the same
|
||||
if last_resolution_size == this_resolution_size:
|
||||
last_resolution_framerate = float(last_resolution.split('@')[1])
|
||||
this_resolution_framerate = float(resolution_match.group(1).split('@')[1])
|
||||
|
||||
# If the difference between the last resolution framerate and this one is >1, ignore it
|
||||
if last_resolution_framerate - 1 > this_resolution_framerate:
|
||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||
else:
|
||||
# If the resolution is different, this adds the new resolution
|
||||
# to the list without truncating
|
||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||
else:
|
||||
# This is the first resolution, add it to the list
|
||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||
|
||||
if current_monitor:
|
||||
monitors.append(current_monitor)
|
||||
|
||||
return monitors
|
||||
else:
|
||||
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||
|
||||
|
||||
def steam_big_picture(connection: Connection, action: str):
|
||||
"""Controls Steam in Big Picture mode on the host."""
|
||||
|
||||
_LOGGER.debug(f"Running Steam Big Picture action {action} on system running at {connection.host}.")
|
||||
|
||||
steam_commands = {
|
||||
"start": {
|
||||
"unix": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -bigpicture &",
|
||||
"windows": "start steam://open/bigpicture"
|
||||
},
|
||||
"stop": {
|
||||
"unix": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -shutdown &",
|
||||
"windows": "C:\\Program Files (x86)\\Steam\\steam.exe -shutdown"
|
||||
# TODO: check for different Steam install paths
|
||||
},
|
||||
"exit": {
|
||||
"unix": None, # TODO: find a way to exit Steam Big Picture
|
||||
"windows": "nircmd win close title \"Steam Big Picture Mode\""
|
||||
# TODO: need to test (thx @MasterHidra https://www.reddit.com/r/Steam/comments/5c9l20/comment/k5fmb3k)
|
||||
}
|
||||
}
|
||||
|
||||
command = steam_commands.get(action)
|
||||
|
||||
if command is None:
|
||||
raise HomeAssistantError(
|
||||
f"Invalid action {action} for Steam Big Picture on system running at {connection.host}.")
|
||||
|
||||
if is_unix_system(connection):
|
||||
result = connection.run(command.get("unix"))
|
||||
else:
|
||||
result = connection.run(command.get("windows"))
|
||||
|
||||
if result.return_code != 0:
|
||||
raise HomeAssistantError(f"Could not {action} Steam Big Picture on system running at {connection.host}.")
|
||||
|
||||
|
||||
def get_audio_config(connection: Connection):
|
||||
if is_unix_system(connection):
|
||||
config = {'sinks': [], 'sources': []}
|
||||
|
||||
def parse_device_info(lines, device_type):
|
||||
devices = []
|
||||
current_device = {}
|
||||
|
||||
for line in lines:
|
||||
if line.startswith(f"{device_type} #"):
|
||||
if current_device and "Monitor" not in current_device['description']:
|
||||
devices.append(current_device)
|
||||
current_device = {'id': int(re.search(r'#(\d+)', line).group(1))}
|
||||
elif line.startswith(" Name:"):
|
||||
current_device['name'] = line.split(":")[1].strip()
|
||||
elif line.startswith(" State:"):
|
||||
current_device['state'] = line.split(":")[1].strip()
|
||||
elif line.startswith(" Description:"):
|
||||
current_device['description'] = line.split(":")[1].strip()
|
||||
|
||||
if current_device:
|
||||
devices.append(current_device)
|
||||
|
||||
return devices
|
||||
|
||||
# Get sinks
|
||||
result = connection.run("LANG=en_US.UTF-8 pactl list sinks")
|
||||
if result.return_code != 0:
|
||||
raise HomeAssistantError(f"Could not get audio sinks on system running at {connection.host}.")
|
||||
config['sinks'] = parse_device_info(result.stdout.split('\n'), 'Sink')
|
||||
|
||||
# Get sources
|
||||
result = connection.run("LANG=en_US.UTF-8 pactl list sources")
|
||||
if result.return_code != 0:
|
||||
raise HomeAssistantError(f"Could not get audio sources on system running at {connection.host}.")
|
||||
config['sources'] = parse_device_info(result.stdout.split('\n'), 'Source')
|
||||
|
||||
return config
|
||||
else:
|
||||
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||
|
||||
|
||||
def change_audio_config(connection: Connection, volume: int, mute: bool, input_device: str = "@DEFAULT_SOURCE@",
|
||||
output_device: str = "@DEFAULT_SINK@"):
|
||||
"""Change audio configuration on the host system."""
|
||||
|
||||
if is_unix_system(connection):
|
||||
current_config = get_audio_config(connection)
|
||||
executable = "pactl"
|
||||
commands = []
|
||||
|
||||
def get_device_id(device_type, user_device):
|
||||
for device in current_config[device_type]:
|
||||
if device['description'] == user_device:
|
||||
return device['name']
|
||||
return user_device
|
||||
|
||||
# Set default sink and source if not specified
|
||||
if not output_device:
|
||||
output_device = "@DEFAULT_SINK@"
|
||||
if not input_device:
|
||||
input_device = "@DEFAULT_SOURCE@"
|
||||
|
||||
# Set default sink if specified
|
||||
if output_device and output_device != "@DEFAULT_SINK@":
|
||||
output_device = get_device_id('sinks', output_device)
|
||||
commands.append(f"{executable} set-default-sink {output_device}")
|
||||
|
||||
# Set default source if specified
|
||||
if input_device and input_device != "@DEFAULT_SOURCE@":
|
||||
input_device = get_device_id('sources', input_device)
|
||||
commands.append(f"{executable} set-default-source {input_device}")
|
||||
|
||||
# Set sink volume if specified
|
||||
if volume is not None:
|
||||
commands.append(f"{executable} set-sink-volume {output_device} {volume}%")
|
||||
|
||||
# Set sink and source mute status if specified
|
||||
if mute is not None:
|
||||
commands.append(f"{executable} set-sink-mute {output_device} {'yes' if mute else 'no'}")
|
||||
commands.append(f"{executable} set-source-mute {input_device} {'yes' if mute else 'no'}")
|
||||
|
||||
# Execute commands
|
||||
for command in commands:
|
||||
_LOGGER.debug("Running command: %s", command)
|
||||
result = connection.run(command)
|
||||
|
||||
if result.return_code != 0:
|
||||
raise HomeAssistantError(
|
||||
f"Could not change audio config on system running on {connection.host}, check logs with debug")
|
||||
else:
|
||||
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||
|
||||
|
||||
def get_debug_info(connection: Connection):
|
||||
"""Return debug information about the host system."""
|
||||
|
||||
data = {}
|
||||
|
||||
data_os = {
|
||||
'name': get_operating_system(connection),
|
||||
'version': get_operating_system_version(connection),
|
||||
'is_unix': is_unix_system(connection)
|
||||
}
|
||||
|
||||
data_ssh = {
|
||||
'is_connected': connection.is_connected,
|
||||
'username': connection.user,
|
||||
'host': connection.host,
|
||||
'port': connection.port
|
||||
}
|
||||
|
||||
data_grub = {
|
||||
'windows_entry': get_windows_entry_in_grub(connection)
|
||||
}
|
||||
|
||||
data_audio = {
|
||||
'speakers': get_audio_config(connection).get('sinks'),
|
||||
'microphones': get_audio_config(connection).get('sources')
|
||||
}
|
||||
|
||||
data['os'] = data_os
|
||||
data['ssh'] = data_ssh
|
||||
data['grub'] = data_grub
|
||||
data['audio'] = data_audio
|
||||
data['monitors'] = get_monitors_config(connection)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def get_bluetooth_devices(connection: Connection, only_connected: bool = False, return_as_string: bool = False):
|
||||
"""Return a list of Bluetooth devices connected to the host system."""
|
||||
|
||||
commands = {
|
||||
"unix": "bash -c \'bluetoothctl devices | cut -f2 -d\\' \\' | while read uuid; do bluetoothctl info $uuid; done|grep -e \"Device\\|Connected\\|Name\"\'",
|
||||
"windows": "",
|
||||
}
|
||||
|
||||
if is_unix_system(connection):
|
||||
result = connection.run(commands["unix"])
|
||||
if result.return_code != 0:
|
||||
raise HomeAssistantError(f"Could not get Bluetooth devices on system running at {connection.host}.")
|
||||
|
||||
devices = []
|
||||
current_device = {}
|
||||
|
||||
for line in result.stdout.split('\n'):
|
||||
line = line.strip()
|
||||
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if line.startswith("Device"):
|
||||
if current_device:
|
||||
devices.append(current_device)
|
||||
current_device = {"address": line.split()[1]}
|
||||
elif line.startswith("Name:"):
|
||||
current_device["name"] = line.split(":")[1].strip()
|
||||
elif line.startswith("Connected:"):
|
||||
current_device["connected"] = line.split(":")[1].strip()
|
||||
|
||||
if current_device:
|
||||
devices.append(current_device)
|
||||
|
||||
if only_connected:
|
||||
devices = [device for device in devices if device["connected"] == "yes"]
|
||||
|
||||
if return_as_string:
|
||||
devices = "; ".join([f"{device['name']} ({device['address']})" for device in devices])
|
||||
|
||||
return devices
|
||||
else:
|
||||
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
@ -1,5 +1,4 @@
|
||||
fabric2~=3.2.2
|
||||
paramiko~=3.4.0
|
||||
voluptuous~=0.14.2
|
||||
wakeonlan~=3.1.0
|
||||
homeassistant~=2024.3.1
|
||||
homeassistant
|
||||
paramiko~=3.5.0
|
||||
voluptuous~=0.15.2
|
Loading…
Reference in New Issue
Block a user