Compare commits

...

30 Commits
1.2.0 ... main

Author SHA1 Message Date
f82826fc0a
improve structure and refactor 2024-10-19 12:38:26 +02:00
67df62ff91
improve ssh client class 2024-10-19 12:02:45 +02:00
59ff04775c
fixed sudo permission for some commands 2024-10-19 11:42:57 +02:00
08d9f78a35
improved ssh connections and reliability 2024-10-19 11:26:43 +02:00
591396895d
convert exception to debug log and check if on before update 2024-10-18 10:37:41 +02:00
daf2d6d63e Merge pull request 'fixed buggy ssh connections' (!1) from v2 into main
Reviewed-on: #1
2024-10-18 10:29:36 +02:00
94a65178c0
fixed buggy ssh connections 2024-10-17 22:11:10 +02:00
b4b0bead89
update readme 2024-08-30 11:31:13 +02:00
723b7b1284
improve to use less sudo commands 2024-08-30 11:19:48 +02:00
eaf434efc8
implement nircmd instlal and steam actions 2024-08-30 10:12:22 +02:00
767d877442
implement change audio config service 2024-08-30 09:55:51 +02:00
3e13d1ddb7
implement monitor config, rework of commands def and other fixes/improvements 2024-08-26 22:14:00 +02:00
8f2f03f134
fix strip error on CommandOutput and fix desktop env detect. linux 2024-08-26 20:51:33 +02:00
af870c63eb
move computer to submodule, add CommandOutput, remove useless getters, 2024-08-26 20:39:22 +02:00
3a3775f457
reformat, implement audio, monitors, bluetooth parse 2024-08-26 19:16:44 +02:00
3fc5fb948f
add audio and bluetooth commands, remove paramiko exception in config flow 2024-08-26 19:01:32 +02:00
bc16d06ac6
add gnome monitor parsing 2024-08-26 17:43:15 +02:00
e67e29facc
don't create new connection each time 2024-08-26 14:44:20 +02:00
7e6736c8b3
convert all to async (somehow it works ?? :)), update reqs and manifest 2024-08-26 14:29:09 +02:00
0f72986d2a
improve run_action and use fabric 2024-08-26 13:56:29 +02:00
ab2c8e0376
start working on massive refactor (v2) 2024-08-25 23:20:39 +02:00
e5b6af8e41
some small text refactor 2024-06-09 11:42:28 +02:00
b771739161
add bluetooth info to debug, fix bluetooth detections issues, add more info in error message 2024-06-09 11:34:58 +02:00
ad53165be2
improve install linux script 2024-06-09 11:31:24 +02:00
10324f542b
new bluetooth parsing method 2024-06-01 15:11:59 +02:00
d69ec9c352
disable blocking update if bluetooth devices retrieval fails 2024-06-01 14:44:00 +02:00
601258a6a3
fix for python3.12 and updates some packages 2024-03-17 12:27:13 +01:00
028769faac
fix wrong user when using sudo in auto script
Some checks failed
Validate with hassfest / validate (push) Failing after 1s
HACS Action / HACS Action (push) Failing after 3s
2024-01-01 19:56:50 +01:00
9d6f39d7ac
improved install script to allow for launching GUI apps from SSH
Some checks failed
Validate with hassfest / validate (push) Failing after 2s
HACS Action / HACS Action (push) Failing after 4s
2024-01-01 18:57:37 +01:00
f5747bd6d8
fix the resolution twice to fix gnome display bug
Some checks failed
Validate with hassfest / validate (push) Failing after 2s
HACS Action / HACS Action (push) Failing after 4s
2024-01-01 17:45:22 +01:00
17 changed files with 902 additions and 804 deletions

View File

@ -1,23 +1,69 @@
#!/bin/bash #!/bin/bash
# Retrieve the username of the person who invoked sudo
USER_BEHIND_SUDO=$(who am i | awk '{print $1}')
# Function to print colored text
print_colored() {
local color="$1"
local text="$2"
echo -e "${color}${text}\033[0m"
}
# Define colors
COLOR_RED="\033[0;31m"
COLOR_GREEN="\033[0;32m"
COLOR_YELLOW="\033[1;33m"
COLOR_BLUE="\033[0;34m"
# Ask for HomeAssistant IP
print_colored "$COLOR_BLUE" "Please enter your HomeAssistant local IP address (even if behind proxy, need LAN address):"
read -r HOMEASSISTANT_IP
# Enable SSH Server # Enable SSH Server
print_colored "$COLOR_BLUE" "Enabling SSH Server..."
if command -v systemctl &> /dev/null; then if command -v systemctl &> /dev/null; then
sudo systemctl enable --now sshd sudo systemctl enable --now sshd
print_colored "$COLOR_GREEN" "SSH Server enabled successfully."
else else
echo "Systemctl not found. Please enable SSH manually." print_colored "$COLOR_RED" "Systemctl not found. Please enable SSH manually."
fi fi
# Configure sudoers # Configure sudoers
echo "Configuring sudoers..." print_colored "$COLOR_BLUE" "Configuring sudoers..."
echo -e "\n# Allow your user to execute specific commands without a password" | sudo tee -a /etc/sudoers echo -e "\n# Allow your user to execute specific commands without a password (for EasyComputerManager/HA)" | sudo tee -a /etc/sudoers > /dev/null
echo "$(whoami) ALL=(ALL) NOPASSWD: /sbin/shutdown, /sbin/init, /usr/bin/systemctl, /usr/sbin/pm-suspend, /usr/bin/awk, /usr/sbin/grub-reboot, /usr/sbin/grub2-reboot" | sudo tee -a /etc/sudoers echo "$USER_BEHIND_SUDO ALL=(ALL) NOPASSWD: /sbin/shutdown, /sbin/init, /usr/sbin/pm-suspend, /usr/sbin/grub-reboot, /usr/sbin/grub2-reboot, /usr/bin/cat /etc/grub2.cfg, /usr/bin/cat /etc/grub.cfg, /usr/bin/systemctl poweroff, /usr/bin/systemctl reboot, /usr/bin/systemctl suspend" | sudo tee -a /etc/sudoers > /dev/null
print_colored "$COLOR_GREEN" "Sudoers file configured successfully."
# Firewall Configuration # Firewall Configuration
print_colored "$COLOR_BLUE" "Configuring firewall..."
if command -v ufw &> /dev/null; then if command -v ufw &> /dev/null; then
echo "Configuring firewall..."
sudo ufw allow 22 sudo ufw allow 22
print_colored "$COLOR_GREEN" "Firewall configured to allow SSH."
else else
echo "UFW not found. Please configure the firewall manually." print_colored "$COLOR_RED" "UFW not found. Please configure the firewall manually (if needed)."
fi fi
echo "You can now add your computer to HomeAssistant." # Setup xhost for GUI apps
print_colored "$COLOR_BLUE" "Configuring persistent xhost for starting GUI apps (like Steam)..."
COMMANDS="xhost +$HOMEASSISTANT_IP; xhost +localhost"
DESKTOP_ENTRY_NAME="EasyComputerManager-AutoStart"
DESKTOP_ENTRY_PATH="/home/$USER_BEHIND_SUDO/.config/autostart/$DESKTOP_ENTRY_NAME.desktop"
# Create the desktop entry file for the Desktop Environment to autostart at login every reboot
cat > "$DESKTOP_ENTRY_PATH" <<EOF
[Desktop Entry]
Type=Application
Name=$DESKTOP_ENTRY_NAME
Exec=sh -c '$COMMANDS'
Hidden=false
NoDisplay=false
X-GNOME-Autostart-enabled=true
EOF
chmod +x "$DESKTOP_ENTRY_PATH"
print_colored "$COLOR_GREEN" "Desktop entry created at $DESKTOP_ENTRY_PATH."
print_colored "$COLOR_GREEN" "\nDone! Some features may require a reboot to work including:"
print_colored "$COLOR_YELLOW" " - Starting GUI apps from HomeAssistant"
print_colored "$COLOR_GREEN" "\nYou can now add your computer to HomeAssistant."
print_colored "$COLOR_RED" "\nWARNING : Don't forget to install these packages : gnome-monitor-config, pactl, bluetoothctl"

View File

@ -5,7 +5,9 @@
![img.png](.images/header.png) ![img.png](.images/header.png)
Easy Computer Manager is a custom integration for Home Assistant that allows you to remotely manage various aspects of your computer, such as sending Wake-On-LAN (WoL) packets, restarting the computer between different operating systems (if dual-boot), adjusting audio configurations, changing monitor settings, and more. Easy Computer Manager is a custom integration for Home Assistant that allows you to remotely manage various aspects of
your computer, such as sending Wake-On-LAN (WoL) packets, restarting the computer between different operating systems (
if dual-boot), adjusting audio configurations, changing monitor settings, and more.
## Features ## Features
@ -26,33 +28,55 @@ Easy Computer Manager is a custom integration for Home Assistant that allows you
1. Install [HACS](https://hacs.xyz/) if not already installed. 1. Install [HACS](https://hacs.xyz/) if not already installed.
2. In Home Assistant, go to "HACS" in the sidebar. 2. In Home Assistant, go to "HACS" in the sidebar.
3. Click on "Integrations." 3. Click on "Integrations."
4. Search for "easy_computer_manager" and click "Install." 4. Click on the three dots in the top right corner and select "Custom repositories."
5. Paste the following URL in the "Repo" field: https://github.com/M4TH1EU/HA-EasyComputerManager
6. Select "Integration" from the "Category" dropdown.
7. Click "Add."
8. Search for "easy_computer_manager" and click "Install."
### Manually ### Manually
1. Download the latest release from the [GitHub repository](https://github.com/M4TH1EU/HA-EasyComputerManager/). 1. Download the latest release from the [GitHub repository](https://github.com/M4TH1EU/HA-EasyComputerManager/).
2. Extract the downloaded ZIP file. 2. Extract the downloaded ZIP file.
3. Copy the "custom_components/easy_computer_manager" directory to the "config/custom_components/" directory in your Home Assistant instance. 3. Copy the "custom_components/easy_computer_manager" directory to the "config/custom_components/" directory in your
Home Assistant instance.
## Configuration # Preparing your computer (required)
> [!CAUTION]
> Before adding your computer to Home Assistant, ensure that it is properly configured.
**See wiki page [here](https://github.com/M4TH1EU/HA-EasyComputerManager/wiki/Prepare-your-computer).**
## Add Computer to Home Assistant
1. In Home Assistant, go to "Configuration" in the sidebar. 1. In Home Assistant, go to "Configuration" in the sidebar.
2. Select "Integrations." 2. Select "Integrations."
3. Click the "+" button to add a new integration. 3. Click the "+" button to add a new integration.
4. Search for "Easy Computer Manager" and select it from the list. 4. Search for "Easy Computer Manager" and select it from the list.
5. Follow the on-screen instructions to configure the integration, providing details such as the IP address, username, and password for the computer you want to manage. 5. Follow the on-screen instructions to configure the integration, providing details such as the IP address, mac-adress, username,
and password for the computer you want to add.
6. Once configured, click "Finish" to add the computer to Home Assistant. 6. Once configured, click "Finish" to add the computer to Home Assistant.
> [!NOTE]
> If you are managing a dual-boot computer, ensure that the "Dual boot system" checkbox is enabled during the configuration.
## Usage ## Usage
After adding your computer to Home Assistant, you can use the provided services to manage it remotely. Explore the available services in the Home Assistant "Services" tab or use automations to integrate Easy Computer Manager into your smart home setup. After adding your computer to Home Assistant, you can use the provided services to manage it remotely. Explore the
available services in the Home Assistant "Services" tab or use automations to integrate Easy Computer Manager into your
smart home setup.
## Services
A detailed list of available services and their parameters can be found in the wiki [here](https://github.com/M4TH1EU/HA-EasyComputerManager/wiki/Services).
## Troubleshooting ## Troubleshooting
If you encounter any issues during installation or configuration, refer to the troubleshooting section in the [Wiki](./wiki) or seek assistance from the Home Assistant community. If you encounter any issues during installation or configuration, refer to the troubleshooting section in
the [Wiki](./wiki) or seek assistance from the Home Assistant community.
## Contributions ## Contributions
Contributions are welcome! If you find any bugs or have suggestions for improvements, please open an issue or submit a pull request on the [GitHub repository](https://github.com/M4TH1EU/HA-EasyComputerManager). Contributions are welcome! If you find any bugs or have suggestions for improvements, please open an issue or submit a
pull request on the [GitHub repository](https://github.com/M4TH1EU/HA-EasyComputerManager).
Happy automating with Easy Computer Manager! Happy automating with Easy Computer Manager!

View File

@ -3,6 +3,7 @@
# Some snippets of code are from the official wake_on_lan integration (inspiration for this custom component) # Some snippets of code are from the official wake_on_lan integration (inspiration for this custom component)
from __future__ import annotations from __future__ import annotations
import logging import logging
from functools import partial from functools import partial
@ -15,7 +16,7 @@ from homeassistant.core import HomeAssistant, ServiceCall
from .const import DOMAIN, SERVICE_SEND_MAGIC_PACKET, SERVICE_CHANGE_MONITORS_CONFIG from .const import DOMAIN, SERVICE_SEND_MAGIC_PACKET, SERVICE_CHANGE_MONITORS_CONFIG
_LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA = vol.Schema({ WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA = vol.Schema({
vol.Required(CONF_MAC): cv.string, vol.Required(CONF_MAC): cv.string,
@ -39,7 +40,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
if broadcast_port is not None: if broadcast_port is not None:
service_kwargs["port"] = broadcast_port service_kwargs["port"] = broadcast_port
_LOGGER.info( LOGGER.info(
"Sending magic packet to MAC %s (broadcast: %s, port: %s)", "Sending magic packet to MAC %s (broadcast: %s, port: %s)",
mac_address, mac_address,
broadcast_address, broadcast_address,
@ -58,11 +59,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
schema=WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA, schema=WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA,
) )
hass.async_create_task( await hass.config_entries.async_forward_entry_setups(entry, ["switch"])
hass.config_entries.async_forward_entry_setup(
entry, "switch"
)
)
return True return True

View File

@ -0,0 +1,208 @@
import asyncio
from typing import Optional, Dict, Any
from wakeonlan import send_magic_packet
from custom_components.easy_computer_manager import const, LOGGER
from custom_components.easy_computer_manager.computer.common import OSType, CommandOutput
from custom_components.easy_computer_manager.computer.formatter import format_gnome_monitors_args, format_pactl_commands
from custom_components.easy_computer_manager.computer.parser import parse_gnome_monitors_output, parse_pactl_output, \
parse_bluetoothctl
from custom_components.easy_computer_manager.computer.ssh_client import SSHClient
class Computer:
def __init__(self, host: str, mac: str, username: str, password: str, port: int = 22,
dualboot: bool = False) -> None:
"""Initialize the Computer object."""
self.initialized = False # used to avoid duplicated ssh connections
self.host = host
self.mac = mac
self.username = username
self._password = password
self.port = port
self.dualboot = dualboot
self.operating_system: Optional[OSType] = None
self.operating_system_version: Optional[str] = None
self.desktop_environment: Optional[str] = None
self.windows_entry_grub: Optional[str] = None
self.monitors_config: Optional[Dict[str, Any]] = None
self.audio_config: Dict[str, Optional[Dict]] = {}
self.bluetooth_devices: Dict[str, Any] = {}
self._connection: SSHClient = SSHClient(host, username, password, port)
asyncio.create_task(self._connection.connect(computer=self))
async def update(self, state: Optional[bool] = True, timeout: Optional[int] = 2) -> None:
"""Update computer details."""
if not state or not await self.is_on():
LOGGER.debug("Computer is off, skipping update")
return
# Ensure connection is established before updating
await self._ensure_connection_alive(timeout)
# Update tasks
await asyncio.gather(
self._update_operating_system(),
self._update_operating_system_version(),
self._update_desktop_environment(),
self._update_windows_entry_grub(),
self._update_monitors_config(),
self._update_audio_config(),
self._update_bluetooth_devices()
)
async def _ensure_connection_alive(self, timeout: int) -> None:
"""Ensure the SSH connection is alive, reconnect if necessary."""
for _ in range(timeout * 4):
if self._connection.is_connection_alive():
return
await asyncio.sleep(0.25)
if not self._connection.is_connection_alive():
LOGGER.debug(f"Reconnecting to {self.host}")
await self._connection.connect()
if not self._connection.is_connection_alive():
LOGGER.debug(f"Failed to connect to {self.host} after timeout={timeout}s")
raise ConnectionError("SSH connection could not be re-established")
async def _update_operating_system(self) -> None:
"""Update the operating system information."""
self.operating_system = await self._detect_operating_system()
async def _update_operating_system_version(self) -> None:
"""Update the operating system version."""
self.operating_system_version = (await self.run_action("operating_system_version")).output
async def _update_desktop_environment(self) -> None:
"""Update the desktop environment information."""
self.desktop_environment = (await self.run_action("desktop_environment")).output.lower()
async def _update_windows_entry_grub(self) -> None:
"""Update Windows entry in GRUB (if applicable)."""
self.windows_entry_grub = (await self.run_action("get_windows_entry_grub")).output
async def _update_monitors_config(self) -> None:
"""Update monitors configuration."""
if self.operating_system == OSType.LINUX:
output = (await self.run_action("get_monitors_config")).output
self.monitors_config = parse_gnome_monitors_output(output)
# TODO: Implement for Windows if needed
async def _update_audio_config(self) -> None:
"""Update audio configuration."""
speakers_output = (await self.run_action("get_speakers")).output
microphones_output = (await self.run_action("get_microphones")).output
if self.operating_system == OSType.LINUX:
self.audio_config = parse_pactl_output(speakers_output, microphones_output)
# TODO: Implement for Windows
async def _update_bluetooth_devices(self) -> None:
"""Update Bluetooth devices list."""
if self.operating_system == OSType.LINUX:
self.bluetooth_devices = parse_bluetoothctl(await self.run_action("get_bluetooth_devices"))
# TODO: Implement for Windows
async def _detect_operating_system(self) -> OSType:
"""Detect the operating system by running a uname command."""
result = await self.run_manually("uname")
return OSType.LINUX if result.successful() else OSType.WINDOWS
async def is_on(self, timeout: int = 1) -> bool:
"""Check if the computer is on by pinging it."""
ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self.host)]
proc = await asyncio.create_subprocess_exec(
*ping_cmd,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await proc.communicate()
return proc.returncode == 0
async def start(self) -> None:
"""Start the computer using Wake-on-LAN."""
send_magic_packet(self.mac)
async def shutdown(self) -> None:
"""Shutdown the computer."""
await self.run_action("shutdown")
async def restart(self, from_os: Optional[OSType] = None, to_os: Optional[OSType] = None) -> None:
"""Restart the computer."""
await self.run_action("restart")
async def put_to_sleep(self) -> None:
"""Put the computer to sleep."""
await self.run_action("sleep")
async def set_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
"""Set monitors configuration."""
if self.is_linux() and self.desktop_environment == 'gnome':
await self.run_action("set_monitors_config", params={"args": format_gnome_monitors_args(monitors_config)})
async def set_audio_config(self, volume: Optional[int] = None, mute: Optional[bool] = None,
input_device: Optional[str] = None, output_device: Optional[str] = None) -> None:
"""Set audio configuration."""
if self.is_linux() and self.desktop_environment == 'gnome':
pactl_commands = format_pactl_commands(self.audio_config, volume, mute, input_device, output_device)
for command in pactl_commands:
await self.run_action("set_audio_config", params={"args": command})
async def install_nircmd(self) -> None:
"""Install NirCmd tool (Windows specific)."""
install_path = f"C:\\Users\\{self.username}\\AppData\\Local\\EasyComputerManager"
await self.run_action("install_nircmd", params={
"download_url": "https://www.nirsoft.net/utils/nircmd.zip",
"install_path": install_path
})
async def steam_big_picture(self, action: str) -> None:
"""Start, stop, or exit Steam Big Picture mode."""
await self.run_action(f"{action}_steam_big_picture")
async def run_action(self, id: str, params: Optional[Dict[str, Any]] = None,
raise_on_error: bool = False) -> CommandOutput:
"""Run a predefined action via SSH."""
params = params or {}
action = const.ACTIONS.get(id)
if not action:
LOGGER.error(f"Action {id} not found.")
return CommandOutput("", 1, "", "Action not found")
if not self.operating_system:
self.operating_system = await self._detect_operating_system()
os_commands = action.get(self.operating_system.lower())
if not os_commands:
raise ValueError(f"Action {id} not supported for OS: {self.operating_system}")
commands = os_commands if isinstance(os_commands, list) else os_commands.get("commands",
[os_commands.get("command")])
required_params = []
if "params" in os_commands:
required_params = os_commands.get("params", [])
# Validate parameters
if sorted(required_params) != sorted(params.keys()):
raise ValueError(f"Invalid/missing parameters for action: {id}")
for command in commands:
for param, value in params.items():
command = command.replace(f"%{param}%", str(value))
result = await self.run_manually(command)
if result.successful():
return result
elif raise_on_error:
raise ValueError(f"Command failed: {command}")
return result
async def run_manually(self, command: str) -> CommandOutput:
"""Run a custom command manually via SSH."""
return await self._connection.execute_command(command)

View File

@ -0,0 +1,18 @@
from enum import Enum
class OSType(str, Enum):
WINDOWS = "Windows"
LINUX = "Linux"
MACOS = "MacOS"
class CommandOutput:
def __init__(self, command: str, return_code: int, output: str, error: str) -> None:
self.command = command
self.return_code = return_code
self.output = output.strip()
self.error = error.strip()
def successful(self) -> bool:
return self.return_code == 0

View File

@ -0,0 +1,62 @@
def format_gnome_monitors_args(monitors_config: dict):
args = []
monitors_config = monitors_config.get('monitors_config', {})
for monitor, settings in monitors_config.items():
if settings.get('enabled', False):
args.extend(['-LpM' if settings.get('primary', False) else '-LM', monitor])
if 'position' in settings:
args.extend(['-x', str(settings["position"][0]), '-y', str(settings["position"][1])])
if 'mode' in settings:
args.extend(['-m', settings["mode"]])
if 'scale' in settings:
args.extend(['-s', str(settings["scale"])])
if 'transform' in settings:
args.extend(['-t', settings["transform"]])
return ' '.join(args)
def format_pactl_commands(current_config: {}, volume: int, mute: bool, input_device: str = "@DEFAULT_SOURCE@",
output_device: str = "@DEFAULT_SINK@"):
"""Change audio configuration on the host system."""
commands = []
def get_device_id(device_type, user_device):
for device in current_config[device_type]:
if device['description'] == user_device:
return device['name']
return user_device
# Set default sink and source if not specified
if not output_device:
output_device = "@DEFAULT_SINK@"
if not input_device:
input_device = "@DEFAULT_SOURCE@"
# Set default sink if specified
if output_device and output_device != "@DEFAULT_SINK@":
output_device = get_device_id('sinks', output_device)
commands.append(f"set-default-sink {output_device}")
# Set default source if specified
if input_device and input_device != "@DEFAULT_SOURCE@":
input_device = get_device_id('sources', input_device)
commands.append(f"set-default-source {input_device}")
# Set sink volume if specified
if volume is not None:
commands.append(f"set-sink-volume {output_device} {volume}%")
# Set sink and source mute status if specified
if mute is not None:
commands.append(f"set-sink-mute {output_device} {'yes' if mute else 'no'}")
commands.append(f"set-source-mute {input_device} {'yes' if mute else 'no'}")
return commands

View File

@ -0,0 +1,168 @@
import re
from custom_components.easy_computer_manager import LOGGER
from custom_components.easy_computer_manager.computer import CommandOutput
def parse_gnome_monitors_output(config: str) -> list:
"""
Parse the GNOME monitors configuration.
:param config:
The output of the gnome-monitor-config list command.
:type config: str
:returns: list
The parsed monitors configuration.
"""
monitors = []
current_monitor = None
for line in config.split('\n'):
monitor_match = re.match(r'^Monitor \[ (.+?) \] (ON|OFF)$', line)
if monitor_match:
if current_monitor:
monitors.append(current_monitor)
source, status = monitor_match.groups()
current_monitor = {'source': source, 'status': status, 'names': [], 'resolutions': []}
elif current_monitor:
display_name_match = re.match(r'^\s+display-name: (.+)$', line)
resolution_match = re.match(r'^\s+(\d+x\d+@\d+(?:\.\d+)?).*$', line)
if display_name_match:
current_monitor['names'].append(display_name_match.group(1).replace('"', ''))
elif resolution_match:
# Don't include resolutions under 1280x720
if int(resolution_match.group(1).split('@')[0].split('x')[0]) >= 1280:
# If there are already resolutions in the list, check if the framerate between the last is >1
if len(current_monitor['resolutions']) > 0:
last_resolution = current_monitor['resolutions'][-1]
last_resolution_size = last_resolution.split('@')[0]
this_resolution_size = resolution_match.group(1).split('@')[0]
# Only truncate some framerates if the resolution are the same
if last_resolution_size == this_resolution_size:
last_resolution_framerate = float(last_resolution.split('@')[1])
this_resolution_framerate = float(resolution_match.group(1).split('@')[1])
# If the difference between the last resolution framerate and this one is >1, ignore it
if last_resolution_framerate - 1 > this_resolution_framerate:
current_monitor['resolutions'].append(resolution_match.group(1))
else:
# If the resolution is different, this adds the new resolution
# to the list without truncating
current_monitor['resolutions'].append(resolution_match.group(1))
else:
# This is the first resolution, add it to the list
current_monitor['resolutions'].append(resolution_match.group(1))
if current_monitor:
monitors.append(current_monitor)
return monitors
def parse_pactl_output(config_speakers: str, config_microphones: str) -> dict[str, list]:
"""
Parse the pactl audio configuration.
:param config_speakers:
The output of the pactl list sinks command.
:param config_microphones:
The output of the pactl list sources command.
:type config_speakers: str
:type config_microphones: str
:returns: dict
The parsed audio configuration.
"""
config = {'speakers': [], 'microphones': []}
def parse_device_info(lines, device_type):
devices = []
current_device = {}
for line in lines:
if line.startswith(f"{device_type} #"):
if current_device and "Monitor" not in current_device['description']:
devices.append(current_device)
current_device = {'id': int(re.search(r'#(\d+)', line).group(1))}
elif line.startswith(" Name:"):
current_device['name'] = line.split(":")[1].strip()
elif line.startswith(" State:"):
current_device['state'] = line.split(":")[1].strip()
elif line.startswith(" Description:"):
current_device['description'] = line.split(":")[1].strip()
if current_device:
devices.append(current_device)
return devices
config['speakers'] = parse_device_info(config_speakers.split('\n'), 'Sink')
config['microphones'] = parse_device_info(config_microphones.split('\n'), 'Source')
return config
def parse_bluetoothctl(command: CommandOutput, connected_devices_only: bool = True,
return_as_string: bool = False) -> list | str:
"""Parse the bluetoothctl info command.
:param command:
The command output.
:param connected_devices_only:
Only return connected devices.
Will return all devices, connected or not, if False.
:type command: :class: CommandOutput
:type connected_devices_only: bool
:returns: str | list
The parsed bluetooth devices.
"""
if not command.successful():
if command.output.__contains__("Missing device address argument"): # Means no devices are connected
return "" if return_as_string else []
else:
LOGGER.warning(f"Cannot retrieve bluetooth devices, make sure bluetoothctl is installed")
return "" if return_as_string else []
devices = []
current_device = None
for line in command.output.split('\n'):
if line.startswith('Device'):
if current_device is not None:
devices.append({
"address": current_device,
"name": current_name,
"connected": current_connected
})
current_device = line.split()[1]
current_name = None
current_connected = None
elif 'Name:' in line:
current_name = line.split(': ', 1)[1]
elif 'Connected:' in line:
current_connected = line.split(': ')[1] == 'yes'
# Add the last device if any
if current_device is not None:
devices.append({
"address": current_device,
"name": current_name,
"connected": current_connected
})
if connected_devices_only:
devices = [device for device in devices if device["connected"] == True]
return devices

View File

@ -0,0 +1,104 @@
import asyncio
from typing import Optional
import paramiko
from custom_components.easy_computer_manager import LOGGER
from custom_components.easy_computer_manager.computer import CommandOutput
class SSHClient:
def __init__(self, host: str, username: str, password: Optional[str] = None, port: int = 22):
self.host = host
self.username = username
self._password = password
self.port = port
self._connection: Optional[paramiko.SSHClient] = None
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
self.disconnect()
async def connect(self, retried: bool = False, computer: Optional['Computer'] = None) -> None:
"""Open an SSH connection using Paramiko asynchronously."""
if self.is_connection_alive():
LOGGER.debug(f"Connection to {self.host} is already active.")
return
self.disconnect() # Ensure any previous connection is closed
loop = asyncio.get_running_loop()
client = paramiko.SSHClient()
# Set missing host key policy to automatically accept unknown host keys
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# Offload the blocking connect call to a thread
await loop.run_in_executor(None, self._blocking_connect, client)
self._connection = client
LOGGER.debug(f"Connected to {self.host}")
except (OSError, paramiko.SSHException) as exc:
LOGGER.debug(f"Failed to connect to {self.host}: {exc}")
if not retried:
LOGGER.debug(f"Retrying connection to {self.host}...")
await self.connect(retried=True) # Retry only once
finally:
if computer is not None and hasattr(computer, "initialized"):
computer.initialized = True
def disconnect(self) -> None:
"""Close the SSH connection."""
if self._connection:
self._connection.close()
LOGGER.debug(f"Disconnected from {self.host}")
self._connection = None
def _blocking_connect(self, client: paramiko.SSHClient):
"""Perform the blocking SSH connection using Paramiko."""
client.connect(
hostname=self.host,
username=self.username,
password=self._password,
port=self.port,
look_for_keys=False, # Set this to True if using private keys
allow_agent=False
)
async def execute_command(self, command: str) -> CommandOutput:
"""Execute a command on the SSH server asynchronously."""
if not self.is_connection_alive():
LOGGER.debug(f"Connection to {self.host} is not alive. Reconnecting...")
await self.connect()
try:
# Offload command execution to avoid blocking
loop = asyncio.get_running_loop()
stdin, stdout, stderr = await loop.run_in_executor(None, self._connection.exec_command, command)
exit_status = stdout.channel.recv_exit_status()
return CommandOutput(command, exit_status, stdout.read().decode(), stderr.read().decode())
except (paramiko.SSHException, EOFError) as exc:
LOGGER.error(f"Failed to execute command on {self.host}: {exc}")
return CommandOutput(command, -1, "", "")
def is_connection_alive(self) -> bool:
"""Check if the SSH connection is still alive."""
if self._connection is None:
return False
try:
transport = self._connection.get_transport()
transport.send_ignore()
self._connection.exec_command('ls', timeout=1)
return True
except Exception as e:
return False

View File

@ -0,0 +1,40 @@
async def format_debug_information(computer: 'Computer'): # importing Computer causes circular import (how to fix?)
"""Return debug information about the host system."""
data = {
'os': {
'name': computer.operating_system,
'version': computer.operating_system_version,
'desktop_environment': computer.desktop_environment
},
'connection': {
'host': computer.host,
'mac': computer.mac,
'username': computer.username,
'port': computer.port,
'dualboot': computer.dualboot,
'is_on': await computer.is_on(),
'is_connected': computer._connection.is_connection_alive()
},
'grub': {
'windows_entry': computer.windows_entry_grub
},
'audio': {
'speakers': computer.audio_config.get('speakers'),
'microphones': computer.audio_config.get('microphones')
},
'monitors': computer.monitors_config,
'bluetooth_devices': computer.bluetooth_devices
}
return data
def get_bluetooth_devices_as_str(computer: 'Computer') -> str:
"""Return the bluetooth devices as a string."""
devices = computer.bluetooth_devices
if not devices:
return ""
return "; ".join([f"{device['name']} ({device['address']})" for device in devices])

View File

@ -7,9 +7,8 @@ from typing import Any
import voluptuous as vol import voluptuous as vol
from homeassistant import config_entries, exceptions from homeassistant import config_entries, exceptions
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from paramiko.ssh_exception import AuthenticationException
from . import utils from .computer import Computer
from .const import DOMAIN from .const import DOMAIN
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -40,6 +39,8 @@ class Hub:
self._name = host self._name = host
self._id = host.lower() self._id = host.lower()
self.computer = Computer(host, "", username, password, port)
@property @property
def hub_id(self) -> str: def hub_id(self) -> str:
"""ID for dummy.""" """ID for dummy."""
@ -48,9 +49,11 @@ class Hub:
async def test_connection(self) -> bool: async def test_connection(self) -> bool:
"""Test connectivity to the computer is OK.""" """Test connectivity to the computer is OK."""
try: try:
return utils.test_connection( # TODO: check if reachable
utils.create_ssh_connection(self._host, self._username, self._password, self._port)) _LOGGER.info("Testing connection to %s", self._host)
except AuthenticationException: return True
except Exception:
return False return False
@ -63,6 +66,7 @@ async def validate_input(hass: HomeAssistant, data: dict) -> dict[str, Any]:
hub = Hub(hass, data["host"], data["username"], data["password"], data["port"]) hub = Hub(hass, data["host"], data["username"], data["password"], data["port"])
_LOGGER.info("Validating configuration")
if not await hub.test_connection(): if not await hub.test_connection():
raise CannotConnect raise CannotConnect
@ -81,7 +85,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
try: try:
info = await validate_input(self.hass, user_input) info = await validate_input(self.hass, user_input)
return self.async_create_entry(title=info["title"], data=user_input) return self.async_create_entry(title=info["title"], data=user_input)
except (AuthenticationException, CannotConnect, InvalidHost) as ex: except (CannotConnect, InvalidHost) as ex:
errors["base"] = str(ex) errors["base"] = str(ex)
except Exception as ex: # pylint: disable=broad-except except Exception as ex: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception: %s", ex) _LOGGER.exception("Unexpected exception: %s", ex)

View File

@ -11,3 +11,87 @@ SERVICE_CHANGE_MONITORS_CONFIG = "change_monitors_config"
SERVICE_STEAM_BIG_PICTURE = "steam_big_picture" SERVICE_STEAM_BIG_PICTURE = "steam_big_picture"
SERVICE_CHANGE_AUDIO_CONFIG = "change_audio_config" SERVICE_CHANGE_AUDIO_CONFIG = "change_audio_config"
SERVICE_DEBUG_INFO = "debug_info" SERVICE_DEBUG_INFO = "debug_info"
ACTIONS = {
"operating_system": {
"linux": ["uname"]
},
"operating_system_version": {
"windows": ['for /f "tokens=1 delims=|" %i in (\'wmic os get Name ^| findstr /B /C:"Microsoft"\') do @echo %i'],
"linux": ["awk -F'=' '/^NAME=|^VERSION=/{gsub(/\"/, \"\", $2); printf $2\" \"}\' /etc/os-release && echo", "lsb_release -a | awk '/Description/ {print $2, $3, $4}'"]
},
"desktop_environment": {
"linux": ["for session in $(ls /usr/bin/*session 2>/dev/null); do basename $session | sed 's/-session//'; done | grep -E 'gnome|kde|xfce|mate|lxde|cinnamon|budgie|unity' | head -n 1"],
"windows": ["echo Windows"]
},
"shutdown": {
"windows": ["shutdown /s /t 0", "wmic os where Primary=TRUE call Shutdown"],
"linux": ["sudo /sbin/shutdown -h now", "sudo /sbin/init 0", "sudo /usr/bin/systemctl poweroff"]
},
"restart": {
"windows": ["shutdown /r /t 0", "wmic os where Primary=TRUE call Reboot"],
"linux": ["sudo /sbin/shutdown -r now", "sudo /sbin/init 6", "sudo /usr/bin/systemctl reboot"]
},
"sleep": {
"windows": ["shutdown /h /t 0", "rundll32.exe powrprof.dll,SetSuspendState Sleep"],
"linux": ["sudo /usr/bin/systemctl suspend", "sudo /usr/sbin/pm-suspend"]
},
"get_windows_entry_grub": {
"linux": ["sudo /usr/bin/cat /etc/grub2.cfg | awk -F \"'\" '/windows/ {print $2}'",
"sudo /usr/bin/cat /etc/grub.cfg | awk -F \"'\" '/windows/ {print $2}'"]
},
"set_grub_entry": {
"linux": {
"commands": ["sudo /usr/sbin/grub-reboot %grub-entry%", "sudo /usr/sbin/grub2-reboot %grub-entry%"],
"params": ["grub-entry"],
}
},
"get_monitors_config": {
"linux": ["gnome-monitor-config list"]
},
"set_monitors_config": {
"linux": {
"gnome": {
"command": "gnome-monitor-config set %args%",
"params": ["args"]
}
}
},
"get_speakers": {
"linux": ["LANG=en_US.UTF-8 pactl list sinks"]
},
"get_microphones": {
"linux": ["LANG=en_US.UTF-8 pactl list sources"]
},
"set_audio_config": {
"linux": {
"command": "LANG=en_US.UTF-8 pactl %args%",
"params": ["args"]
}
},
"get_bluetooth_devices": {
"linux": {
"command": "bluetoothctl info",
"raise_on_error": False,
}
},
"install_nirmcd": {
"windows": {
"command": "powershell -Command \"Invoke-WebRequest -Uri %download_url% -OutFile %install_path%\\nircmd.zip -UseBasicParsing; Expand-Archive %install_path%\\nircmd.zip -DestinationPath %install_path%; Remove-Item %install_path%\\nircmd.zip\"",
"params": ["download_url", "install_path"]
}
},
"start_steam_big_picture": {
"linux": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -bigpicture &",
"windows": "start steam://open/bigpicture"
},
"stop_steam_big_picture": {
"linux": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -shutdown &",
"windows": "C:\\Program Files (x86)\\Steam\\steam.exe -shutdown"
},
"exit_steam_big_picture": {
"linux": "", # TODO: find a way to exit steam big picture
"windows": "nircmd win close title \"Steam Big Picture Mode\""
},
}

View File

@ -2,8 +2,7 @@
"domain": "easy_computer_manager", "domain": "easy_computer_manager",
"name": "Easy Computer Manager", "name": "Easy Computer Manager",
"codeowners": [ "codeowners": [
"@M4TH1EU", "@M4TH1EU"
"@ntilley905"
], ],
"config_flow": true, "config_flow": true,
"dependencies": [], "dependencies": [],
@ -11,8 +10,8 @@
"iot_class": "local_polling", "iot_class": "local_polling",
"issue_tracker": "https://github.com/M4TH1EU/HA-EasyComputerManager/issues", "issue_tracker": "https://github.com/M4TH1EU/HA-EasyComputerManager/issues",
"requirements": [ "requirements": [
"wakeonlan==2.1.0", "wakeonlan==3.1.0",
"fabric2==2.7.1" "asyncssh==2.16.0"
], ],
"version": "1.1.0" "version": "2.0.0"
} }

View File

@ -1,80 +1,53 @@
# Some snippets of code are from the official wake_on_lan integration (inspiration for this custom component)
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging from typing import Any, Dict
import subprocess as sp
from typing import Any
import voluptuous as vol import voluptuous as vol
import wakeonlan from homeassistant.components.switch import SwitchEntity
from homeassistant.components.switch import (SwitchEntity)
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ( from homeassistant.const import (
CONF_HOST, CONF_HOST, CONF_MAC, CONF_NAME, CONF_PASSWORD, CONF_PORT, CONF_USERNAME,
CONF_MAC,
CONF_NAME,
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME, )
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import (
device_registry as dr,
entity_platform,
) )
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse
from homeassistant.helpers import entity_platform, device_registry as dr
from homeassistant.helpers.config_validation import make_entity_service_schema from homeassistant.helpers.config_validation import make_entity_service_schema
from homeassistant.helpers.device_registry import DeviceInfo from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from paramiko.ssh_exception import AuthenticationException
from . import utils from .computer import OSType, Computer
from .const import SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP, \ from .computer.utils import format_debug_information, get_bluetooth_devices_as_str
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \ from .const import (
SERVICE_CHANGE_MONITORS_CONFIG, SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO, DOMAIN DOMAIN, SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP,
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER,
_LOGGER = logging.getLogger(__name__) SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, SERVICE_CHANGE_MONITORS_CONFIG,
SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO
)
async def async_setup_entry( async def async_setup_entry(
hass: HomeAssistant, hass: HomeAssistant,
config: ConfigEntry, config: ConfigEntry,
async_add_entities: AddEntitiesCallback, async_add_entities: AddEntitiesCallback
) -> None: ) -> None:
# Retrieve the data from the config flow """Set up the computer switch from a config entry."""
mac_address: str = config.data.get(CONF_MAC) mac_address = config.data[CONF_MAC]
# broadcast_address: str | None = config.data.get(CONF_BROADCAST_ADDRESS) host = config.data[CONF_HOST]
# broadcast_port: int | None = config.data.get(CONF_BROADCAST_PORT) name = config.data[CONF_NAME]
host: str = config.data.get(CONF_HOST) dualboot = config.data.get("dualboot", False)
name: str = config.data.get(CONF_NAME) username = config.data[CONF_USERNAME]
dualboot: bool = config.data.get("dualboot") password = config.data[CONF_PASSWORD]
username: str = config.data.get(CONF_USERNAME) port = config.data.get(CONF_PORT)
password: str = config.data.get(CONF_PASSWORD)
port: int | None = config.data.get(CONF_PORT)
# Register the computer switch
async_add_entities( async_add_entities(
[ [ComputerSwitch(hass, name, host, mac_address, dualboot, username, password, port)],
ComputerSwitch( True
hass,
name,
host,
mac_address,
dualboot,
username,
password,
port,
),
],
host is not None,
) )
platform = entity_platform.async_get_current_platform() platform = entity_platform.async_get_current_platform()
# Synthax : (service_name: str, schema: dict, supports_response: SupportsResponse) # Service registrations
services = [ services = [
(SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE),
(SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE), (SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE),
(SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, {}, SupportsResponse.NONE), (SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, {}, SupportsResponse.NONE),
(SERVICE_PUT_COMPUTER_TO_SLEEP, {}, SupportsResponse.NONE), (SERVICE_PUT_COMPUTER_TO_SLEEP, {}, SupportsResponse.NONE),
@ -91,18 +64,18 @@ async def async_setup_entry(
(SERVICE_DEBUG_INFO, {}, SupportsResponse.ONLY), (SERVICE_DEBUG_INFO, {}, SupportsResponse.ONLY),
] ]
# Register the services that depends on the switch # Register services with their schemas
for service in services: for service_name, schema, supports_response in services:
platform.async_register_entity_service( platform.async_register_entity_service(
service[0], service_name,
make_entity_service_schema(service[1]), make_entity_service_schema(schema),
service[0], service_name,
supports_response=service[2] supports_response=supports_response
) )
class ComputerSwitch(SwitchEntity): class ComputerSwitch(SwitchEntity):
"""Representation of a computer switch.""" """Representation of a computer switch entity."""
def __init__( def __init__(
self, self,
@ -110,216 +83,121 @@ class ComputerSwitch(SwitchEntity):
name: str, name: str,
host: str | None, host: str | None,
mac_address: str, mac_address: str,
# broadcast_address: str | None, dualboot: bool,
# broadcast_port: int | None,
dualboot: bool | False,
username: str, username: str,
password: str, password: str,
port: int | None, port: int | None,
) -> None: ) -> None:
"""Initialize the WOL switch.""" """Initialize the computer switch entity."""
self.hass = hass
self._hass = hass
self._attr_name = name self._attr_name = name
self._host = host
self._mac_address = mac_address
# self._broadcast_address = broadcast_address
# self._broadcast_port = broadcast_port
self._dualboot = dualboot
self._username = username
self._password = password
self._port = port
self._state = False
self._attr_assumed_state = host is None
self._attr_should_poll = bool(not self._attr_assumed_state)
self._attr_unique_id = dr.format_mac(mac_address) self._attr_unique_id = dr.format_mac(mac_address)
self._state = False
self._attr_should_poll = not self._attr_assumed_state
self._attr_extra_state_attributes = {} self._attr_extra_state_attributes = {}
self._connection = utils.create_ssh_connection(self._host, self._username, self._password)
self.computer = Computer(host, mac_address, username, password, port, dualboot)
@property @property
def device_info(self) -> DeviceInfo | None: def device_info(self) -> DeviceInfo:
"""Return the device information.""" """Return device info for the registry."""
if self._host is None:
return None
return DeviceInfo( return DeviceInfo(
identifiers={(DOMAIN, self._mac_address)}, identifiers={(DOMAIN, self.computer.mac)},
name=self._attr_name, name=self._attr_name,
manufacturer="Generic", manufacturer="Generic",
model="Computer", model="Computer",
sw_version=utils.get_operating_system_version( sw_version=self.computer.operating_system_version,
self._connection) if self._state else "Unknown", connections={(dr.CONNECTION_NETWORK_MAC, self.computer.mac)},
connections={(dr.CONNECTION_NETWORK_MAC, self._mac_address)},
) )
@property @property
def icon(self) -> str: def icon(self) -> str:
"""Return the icon to use in the frontend, if any."""
return "mdi:monitor" if self._state else "mdi:monitor-off" return "mdi:monitor" if self._state else "mdi:monitor-off"
@property @property
def is_on(self) -> bool: def is_on(self) -> bool:
"""Return true if the computer switch is on.""" """Return true if the computer is on."""
return self._state return self._state
def turn_on(self, **kwargs: Any) -> None: async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the computer on using wake on lan.""" """Turn the computer on using Wake-on-LAN."""
service_kwargs: dict[str, Any] = {} await self.computer.start()
# if self._broadcast_address is not None:
# service_kwargs["ip_address"] = self._broadcast_address
# if self._broadcast_port is not None:
# service_kwargs["port"] = self._broadcast_port
_LOGGER.debug(
"Send magic packet to mac %s (broadcast: %s, port: %s)",
self._mac_address,
# self._broadcast_address,
# self._broadcast_port,
)
wakeonlan.send_magic_packet(self._mac_address, **service_kwargs)
if self._attr_assumed_state: if self._attr_assumed_state:
self._state = True self._state = True
self.async_write_ha_state() self.async_write_ha_state()
def turn_off(self, **kwargs: Any) -> None: async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the computer off using appropriate shutdown command based on running OS and/or distro.""" """Turn the computer off via shutdown command."""
utils.shutdown_system(self._connection) await self.computer.shutdown()
if self._attr_assumed_state: if self._attr_assumed_state:
self._state = False self._state = False
self.async_write_ha_state() self.async_write_ha_state()
def restart_to_windows_from_linux(self) -> None: async def async_update(self) -> None:
"""Restart the computer to Windows from a running Linux by setting grub-reboot and restarting.""" """Update the state by checking if the computer is on."""
is_on = await self.computer.is_on()
if self.is_on != is_on:
self._state = is_on
# self.async_write_ha_state()
if self._dualboot: # If the computer is on, update its attributes
utils.restart_to_windows_from_linux(self._connection) if is_on:
else: await self.computer.update(is_on)
_LOGGER.error(
"The computer with the IP address %s is not running a dualboot system or hasn't been configured "
"correctly in the UI.",
self._host)
def restart_to_linux_from_windows(self) -> None:
"""Restart the computer to Linux from a running Windows by setting grub-reboot and restarting."""
if self._dualboot:
# TODO: check for default grub entry and adapt accordingly
utils.restart_system(self._connection)
else:
_LOGGER.error(
"The computer with the IP address %s is not running a dualboot system or hasn't been configured "
"correctly in the UI.",
self._host)
def put_computer_to_sleep(self) -> None:
"""Put the computer to sleep using appropriate sleep command based on running OS and/or distro."""
utils.sleep_system(self._connection)
def start_computer_to_windows(self) -> None:
"""Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart."""
self.turn_on()
if self._dualboot:
# Wait for the computer to boot using a dedicated thread to avoid blocking the main thread
self._hass.loop.create_task(self.service_restart_to_windows_from_linux())
else:
_LOGGER.error(
"The computer with the IP address %s is not running a dualboot system or hasn't been configured "
"correctly in the UI.",
self._host)
async def service_restart_to_windows_from_linux(self) -> None:
"""Method to be run in a separate thread to wait for the computer to boot and then reboot to Windows."""
while not self.is_on:
await asyncio.sleep(3)
await utils.restart_to_windows_from_linux(self._connection)
def restart_computer(self) -> None:
"""Restart the computer using appropriate restart command based on running OS and/or distro."""
# TODO: check for default grub entry and adapt accordingly
if self._dualboot and not utils.is_unix_system(connection=self._connection):
utils.restart_system(self._connection)
# Wait for the computer to boot using a dedicated thread to avoid blocking the main thread
self.restart_to_windows_from_linux()
else:
utils.restart_system(self._connection)
def change_monitors_config(self, monitors_config: dict | None = None) -> None:
"""Change the monitors configuration using a YAML config file."""
if monitors_config is not None and len(monitors_config) > 0:
utils.change_monitors_config(self._connection, monitors_config)
else:
raise HomeAssistantError("The 'monitors_config' parameter must be a non-empty dictionary.")
def steam_big_picture(self, action: str) -> None:
"""Controls Steam Big Picture mode."""
if action is not None:
utils.steam_big_picture(self._connection, action)
else:
raise HomeAssistantError("The 'action' parameter must be specified.")
def change_audio_config(self, volume: int | None = None, mute: bool | None = None, input_device: str | None = None,
output_device: str | None = None) -> None:
"""Change the audio configuration using a YAML config file."""
utils.change_audio_config(self._connection, volume, mute, input_device, output_device)
def update(self) -> None:
"""Ping the computer to see if it is online and update the state."""
timeout = 1
ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self._host)]
status = sp.call(ping_cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
self._state = not bool(status)
# Update the state attributes and the connection only if the computer is on
if self._state:
if self._connection is None or not utils.test_connection(self._connection):
self.renew_ssh_connection()
if not self._state:
return
self._attr_extra_state_attributes = { self._attr_extra_state_attributes = {
"operating_system": utils.get_operating_system(self._connection), "operating_system": self.computer.operating_system,
"operating_system_version": utils.get_operating_system_version(self._connection), "operating_system_version": self.computer.operating_system_version,
"mac_address": self._mac_address, "mac_address": self.computer.mac,
"ip_address": self._host, "ip_address": self.computer.host,
"connected_devices": utils.get_bluetooth_devices(self._connection, only_connected=True, return_as_string=True), "connected_devices": get_bluetooth_devices_as_str(self.computer),
} }
def renew_ssh_connection(self) -> None: # Service methods for various functionalities
"""Renew the SSH connection.""" async def restart_to_windows_from_linux(self) -> None:
_LOGGER.info("Renewing SSH connection to %s using username %s", self._host, self._username) """Restart the computer from Linux to Windows."""
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
if self._connection is not None: async def restart_to_linux_from_windows(self) -> None:
self._connection.close() """Restart the computer from Windows to Linux."""
await self.computer.restart(OSType.WINDOWS, OSType.LINUX)
try: async def put_computer_to_sleep(self) -> None:
self._connection = utils.create_ssh_connection(self._host, self._username, self._password) """Put the computer to sleep."""
self._connection.open() await self.computer.put_to_sleep()
except AuthenticationException as error:
_LOGGER.error("Could not authenticate to %s using username %s: %s", self._host, self._username, error)
self._state = False
except Exception as error:
_LOGGER.error("Could not connect to %s using username %s: %s", self._host, self._username, error)
# Check if the error is due to timeout async def start_computer_to_windows(self) -> None:
if "timed out" in str(error): """Start the computer to Windows after booting into Linux first."""
_LOGGER.warning( await self.computer.start()
"Computer at %s does not respond to the SSH request. Possible causes: might be offline, "
"the firewall is blocking the SSH port, or the SSH server is offline and/or misconfigured.",
self._host
)
self._state = False async def wait_and_reboot() -> None:
"""Wait until the computer is on, then restart to Windows."""
while not await self.computer.is_on():
await asyncio.sleep(3)
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
def debug_info(self) -> ServiceResponse: self.hass.loop.create_task(wait_and_reboot())
"""Prints debug info."""
return utils.get_debug_info(self._connection) async def restart_computer(self) -> None:
"""Restart the computer."""
await self.computer.restart()
async def change_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
"""Change the monitor configuration."""
await self.computer.set_monitors_config(monitors_config)
async def steam_big_picture(self, action: str) -> None:
"""Control Steam Big Picture mode."""
await self.computer.steam_big_picture(action)
async def change_audio_config(
self, volume: int | None = None, mute: bool | None = None,
input_device: str | None = None, output_device: str | None = None
) -> None:
"""Change the audio configuration."""
await self.computer.set_audio_config(volume, mute, input_device, output_device)
async def debug_info(self) -> ServiceResponse:
"""Return debug information."""
return await format_debug_information(self.computer)

View File

@ -1,533 +0,0 @@
import logging
import re
import fabric2
from fabric2 import Connection
from homeassistant.exceptions import HomeAssistantError
_LOGGER = logging.getLogger(__name__)
# _LOGGER.setLevel(logging.DEBUG)
def create_ssh_connection(host: str, username: str, password: str, port=22):
"""Create an SSH connection to a host using a username and password specified in the config flow."""
conf = fabric2.Config()
conf.run.hide = True
conf.run.warn = True
conf.warn = True
conf.sudo.password = password
conf.password = password
connection = Connection(
host=host, user=username, port=port, connect_timeout=3, connect_kwargs={"password": password},
config=conf
)
_LOGGER.info("Successfully created SSH connection to %s using username %s", host, username)
return connection
def test_connection(connection: Connection):
"""Test the connection to the host by running a simple command."""
try:
connection.run('ls')
return True
except Exception:
return False
def is_unix_system(connection: Connection):
"""Return a boolean based on get_operating_system result."""
return get_operating_system(connection) == "Linux/Unix"
def get_operating_system_version(connection: Connection, is_unix=None):
"""Return the running operating system name and version."""
if is_unix is None:
is_unix = is_unix_system(connection)
if is_unix:
result = connection.run(
"awk -F'=' '/^NAME=|^VERSION=/{gsub(/\"/, \"\", $2); printf $2\" \"}\' /etc/os-release && echo").stdout
if result == "":
result = connection.run("lsb_release -a | awk '/Description/ {print $2, $3, $4}'").stdout
return result
else:
return connection.run(
'for /f "tokens=1 delims=|" %i in (\'wmic os get Name ^| findstr /B /C:"Microsoft"\') do @echo %i').stdout
def get_operating_system(connection: Connection):
"""Return the running operating system type."""
# TODO: might be a better way to do this
result = connection.run("uname")
if result.return_code == 0:
return "Linux/Unix"
else:
return "Windows/Other"
def shutdown_system(connection: Connection, is_unix=None):
"""Shutdown the system."""
if is_unix is None:
is_unix = is_unix_system(connection)
shutdown_commands = {
"unix": ["sudo shutdown -h now", "sudo init 0", "sudo systemctl poweroff"],
"windows": ["shutdown /s /t 0", "wmic os where Primary=TRUE call Shutdown"]
}
for command in shutdown_commands["unix" if is_unix else "windows"]:
result = connection.run(command)
if result.return_code == 0:
_LOGGER.debug("System shutting down on %s.", connection.host)
connection.close()
return
raise HomeAssistantError(f"Cannot shutdown system running at {connection.host}, all methods failed.")
def restart_system(connection: Connection, is_unix=None):
"""Restart the system."""
if is_unix is None:
is_unix = is_unix_system(connection)
restart_commands = {
"unix": ["sudo shutdown -r now", "sudo init 6", "sudo systemctl reboot"],
"windows": ["shutdown /r /t 0", "wmic os where Primary=TRUE call Reboot"]
}
for command in restart_commands["unix" if is_unix else "windows"]:
result = connection.run(command)
if result.return_code == 0:
_LOGGER.debug("System restarting on %s.", connection.host)
return
raise HomeAssistantError(f"Cannot restart system running at {connection.host}, all methods failed.")
def sleep_system(connection: Connection, is_unix=None):
"""Put the system to sleep."""
if is_unix is None:
is_unix = is_unix_system(connection)
sleep_commands = {
"unix": ["sudo systemctl suspend", "sudo pm-suspend"],
"windows": ["shutdown /h /t 0", "rundll32.exe powrprof.dll,SetSuspendState Sleep"]
}
for command in sleep_commands["unix" if is_unix else "windows"]:
result = connection.run(command)
if result.return_code == 0:
_LOGGER.debug("System sleeping on %s.", connection.host)
return
raise HomeAssistantError(f"Cannot put system running at {connection.host} to sleep, all methods failed.")
def get_windows_entry_in_grub(connection: Connection):
"""
Grabs the Windows entry name in GRUB.
Used later with grub-reboot to specify which entry to boot.
"""
commands = [
"sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub/grub.cfg",
"sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub2/grub.cfg"
]
for command in commands:
result = connection.run(command)
if result.return_code == 0 and result.stdout.strip():
_LOGGER.debug("Found Windows entry in GRUB: " + result.stdout.strip())
return result.stdout.strip()
_LOGGER.error("Could not find Windows entry in GRUB for system running at %s.", connection.host)
return None
def restart_to_windows_from_linux(connection: Connection):
"""Restart a running Linux system to Windows."""
if not is_unix_system(connection):
raise HomeAssistantError(f"System running at {connection.host} is not a Linux system.")
windows_entry = get_windows_entry_in_grub(connection)
if windows_entry is not None:
reboot_commands = ["sudo grub-reboot", "sudo grub2-reboot"]
for reboot_command in reboot_commands:
result = connection.run(f"{reboot_command} \"{windows_entry}\"")
if result.return_code == 0:
_LOGGER.debug("Rebooting to Windows")
restart_system(connection)
return
raise HomeAssistantError(f"Failed to restart system running on {connection.host} to Windows from Linux.")
else:
raise HomeAssistantError(f"Could not find Windows entry in grub for system running at {connection.host}.")
def change_monitors_config(connection: Connection, monitors_config: dict):
"""Change monitors configuration on the host (Linux + Gnome, and partial Windows support)."""
if is_unix_system(connection):
command_parts = ["gnome-monitor-config", "set"]
for monitor, settings in monitors_config.items():
if settings.get('enabled', False):
command_parts.extend(['-LpM' if settings.get('primary', False) else '-LM', monitor])
if 'position' in settings:
command_parts.extend(['-x', str(settings["position"][0]), '-y', str(settings["position"][1])])
if 'mode' in settings:
command_parts.extend(['-m', settings["mode"]])
if 'scale' in settings:
command_parts.extend(['-s', str(settings["scale"])])
if 'transform' in settings:
command_parts.extend(['-t', settings["transform"]])
command = ' '.join(command_parts)
_LOGGER.debug("Running command: %s", command)
result = connection.run(command)
if result.return_code == 0:
_LOGGER.info("Successfully changed monitors config on system running on %s.", connection.host)
else:
raise HomeAssistantError("Could not change monitors config on system running on %s, check logs with debug",
connection.host)
else:
raise HomeAssistantError("Not implemented yet for Windows OS.")
# TODO: Implement Windows support using NIRCMD
command_parts = ["nircmd.exe", "setdisplay"]
# setdisplay {monitor:index/name} [width] [height] [color bits] {refresh rate} {-updatereg} {-allusers}
for monitor, settings in monitors_config.items():
if settings.get('enabled', False):
command_parts.extend(
[f'{monitor} -primary' if settings.get('primary', False) else f'{monitor} -secondary'])
if 'resolution' in settings:
command_parts.extend([str(settings["resolution"][0]), str(settings["resolution"][1])])
if 'refresh_rate' in settings:
command_parts.extend(['-hz', str(settings["refresh_rate"])])
if 'color_bits' in settings:
command_parts.extend(['-bits', str(settings["color_bits"])])
command = ' '.join(command_parts)
_LOGGER.debug("Running command: %s", command)
result = connection.run(command)
if result.return_code == 0:
_LOGGER.info("Successfully changed monitors config on system running on %s.", connection.host)
else:
raise HomeAssistantError("Could not change monitors config on system running on %s, check logs with debug",
connection.host)
def silent_install_nircmd(connection: Connection):
"""Silently install NIRCMD on a Windows system."""
if not is_unix_system(connection):
download_url = "https://www.nirsoft.net/utils/nircmd.zip"
install_path = f"C:\\Users\\{connection.user}\\AppData\\Local\\EasyComputerManager"
# Download and unzip NIRCMD
download_command = f"powershell -Command \"Invoke-WebRequest -Uri {download_url} -OutFile {install_path}\\nircmd.zip -UseBasicParsing\""
unzip_command = f"powershell -Command \"Expand-Archive {install_path}\\nircmd.zip -DestinationPath {install_path}\""
remove_zip_command = f"powershell -Command \"Remove-Item {install_path}\\nircmd.zip\""
commands = [download_command, unzip_command, remove_zip_command]
for command in commands:
result = connection.run(command)
if result.return_code != 0:
_LOGGER.error("Could not install NIRCMD on system running on %s.", connection.host)
return
def get_monitors_config(connection: Connection) -> dict:
"""Parse the output of the gnome-monitor-config command to get the current monitor configuration."""
if is_unix_system(connection):
result = connection.run("gnome-monitor-config list")
if result.return_code != 0:
raise HomeAssistantError(f"Could not get monitors config on system running at {connection.host}.")
monitors = []
current_monitor = None
for line in result.stdout.split('\n'):
monitor_match = re.match(r'^Monitor \[ (.+?) \] (ON|OFF)$', line)
if monitor_match:
if current_monitor:
monitors.append(current_monitor)
source, status = monitor_match.groups()
current_monitor = {'source': source, 'status': status, 'names': [], 'resolutions': []}
elif current_monitor:
display_name_match = re.match(r'^\s+display-name: (.+)$', line)
resolution_match = re.match(r'^\s+(\d+x\d+@\d+(?:\.\d+)?).*$', line)
if display_name_match:
current_monitor['names'].append(display_name_match.group(1).replace('"', ''))
elif resolution_match:
# Don't include resolutions under 1280x720
if int(resolution_match.group(1).split('@')[0].split('x')[0]) >= 1280:
# If there are already resolutions in the list, check if the framerate between the last is >1
if len(current_monitor['resolutions']) > 0:
last_resolution = current_monitor['resolutions'][-1]
last_resolution_size = last_resolution.split('@')[0]
this_resolution_size = resolution_match.group(1).split('@')[0]
# Only truncate some framerates if the resolution are the same
if last_resolution_size == this_resolution_size:
last_resolution_framerate = float(last_resolution.split('@')[1])
this_resolution_framerate = float(resolution_match.group(1).split('@')[1])
# If the difference between the last resolution framerate and this one is >1, ignore it
if last_resolution_framerate - 1 > this_resolution_framerate:
current_monitor['resolutions'].append(resolution_match.group(1))
else:
# If the resolution is different, this adds the new resolution
# to the list without truncating
current_monitor['resolutions'].append(resolution_match.group(1))
else:
# This is the first resolution, add it to the list
current_monitor['resolutions'].append(resolution_match.group(1))
if current_monitor:
monitors.append(current_monitor)
return monitors
else:
raise HomeAssistantError("Not implemented yet for Windows OS.")
def steam_big_picture(connection: Connection, action: str):
"""Controls Steam in Big Picture mode on the host."""
_LOGGER.debug(f"Running Steam Big Picture action {action} on system running at {connection.host}.")
steam_commands = {
"start": {
"unix": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -bigpicture &",
"windows": "start steam://open/bigpicture"
},
"stop": {
"unix": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -shutdown &",
"windows": "C:\\Program Files (x86)\\Steam\\steam.exe -shutdown"
# TODO: check for different Steam install paths
},
"exit": {
"unix": None, # TODO: find a way to exit Steam Big Picture
"windows": "nircmd win close title \"Steam Big Picture Mode\""
# TODO: need to test (thx @MasterHidra https://www.reddit.com/r/Steam/comments/5c9l20/comment/k5fmb3k)
}
}
command = steam_commands.get(action)
if command is None:
raise HomeAssistantError(
f"Invalid action {action} for Steam Big Picture on system running at {connection.host}.")
if is_unix_system(connection):
result = connection.run(command.get("unix"))
else:
result = connection.run(command.get("windows"))
if result.return_code != 0:
raise HomeAssistantError(f"Could not {action} Steam Big Picture on system running at {connection.host}.")
def get_audio_config(connection: Connection):
if is_unix_system(connection):
config = {'sinks': [], 'sources': []}
def parse_device_info(lines, device_type):
devices = []
current_device = {}
for line in lines:
if line.startswith(f"{device_type} #"):
if current_device and "Monitor" not in current_device['description']:
devices.append(current_device)
current_device = {'id': int(re.search(r'#(\d+)', line).group(1))}
elif line.startswith(" Name:"):
current_device['name'] = line.split(":")[1].strip()
elif line.startswith(" State:"):
current_device['state'] = line.split(":")[1].strip()
elif line.startswith(" Description:"):
current_device['description'] = line.split(":")[1].strip()
if current_device:
devices.append(current_device)
return devices
# Get sinks
result = connection.run("LANG=en_US.UTF-8 pactl list sinks")
if result.return_code != 0:
raise HomeAssistantError(f"Could not get audio sinks on system running at {connection.host}.")
config['sinks'] = parse_device_info(result.stdout.split('\n'), 'Sink')
# Get sources
result = connection.run("LANG=en_US.UTF-8 pactl list sources")
if result.return_code != 0:
raise HomeAssistantError(f"Could not get audio sources on system running at {connection.host}.")
config['sources'] = parse_device_info(result.stdout.split('\n'), 'Source')
return config
else:
raise HomeAssistantError("Not implemented yet for Windows OS.")
def change_audio_config(connection: Connection, volume: int, mute: bool, input_device: str = "@DEFAULT_SOURCE@",
output_device: str = "@DEFAULT_SINK@"):
"""Change audio configuration on the host system."""
if is_unix_system(connection):
current_config = get_audio_config(connection)
executable = "pactl"
commands = []
def get_device_id(device_type, user_device):
for device in current_config[device_type]:
if device['description'] == user_device:
return device['name']
return user_device
# Set default sink and source if not specified
if not output_device:
output_device = "@DEFAULT_SINK@"
if not input_device:
input_device = "@DEFAULT_SOURCE@"
# Set default sink if specified
if output_device and output_device != "@DEFAULT_SINK@":
output_device = get_device_id('sinks', output_device)
commands.append(f"{executable} set-default-sink {output_device}")
# Set default source if specified
if input_device and input_device != "@DEFAULT_SOURCE@":
input_device = get_device_id('sources', input_device)
commands.append(f"{executable} set-default-source {input_device}")
# Set sink volume if specified
if volume is not None:
commands.append(f"{executable} set-sink-volume {output_device} {volume}%")
# Set sink and source mute status if specified
if mute is not None:
commands.append(f"{executable} set-sink-mute {output_device} {'yes' if mute else 'no'}")
commands.append(f"{executable} set-source-mute {input_device} {'yes' if mute else 'no'}")
# Execute commands
for command in commands:
_LOGGER.debug("Running command: %s", command)
result = connection.run(command)
if result.return_code != 0:
raise HomeAssistantError(
f"Could not change audio config on system running on {connection.host}, check logs with debug")
else:
raise HomeAssistantError("Not implemented yet for Windows OS.")
def get_debug_info(connection: Connection):
"""Return debug information about the host system."""
data = {}
data_os = {
'name': get_operating_system(connection),
'version': get_operating_system_version(connection),
'is_unix': is_unix_system(connection)
}
data_ssh = {
'is_connected': connection.is_connected,
'username': connection.user,
'host': connection.host,
'port': connection.port
}
data_grub = {
'windows_entry': get_windows_entry_in_grub(connection)
}
data_audio = {
'speakers': get_audio_config(connection).get('sinks'),
'microphones': get_audio_config(connection).get('sources')
}
data['os'] = data_os
data['ssh'] = data_ssh
data['grub'] = data_grub
data['audio'] = data_audio
data['monitors'] = get_monitors_config(connection)
return data
def get_bluetooth_devices(connection: Connection, only_connected: bool = False, return_as_string: bool = False):
"""Return a list of Bluetooth devices connected to the host system."""
commands = {
"unix": "bash -c \'bluetoothctl devices | cut -f2 -d\\' \\' | while read uuid; do bluetoothctl info $uuid; done|grep -e \"Device\\|Connected\\|Name\"\'",
"windows": "",
}
if is_unix_system(connection):
result = connection.run(commands["unix"])
if result.return_code != 0:
raise HomeAssistantError(f"Could not get Bluetooth devices on system running at {connection.host}.")
devices = []
current_device = {}
for line in result.stdout.split('\n'):
line = line.strip()
if not line:
continue
if line.startswith("Device"):
if current_device:
devices.append(current_device)
current_device = {"address": line.split()[1]}
elif line.startswith("Name:"):
current_device["name"] = line.split(":")[1].strip()
elif line.startswith("Connected:"):
current_device["connected"] = line.split(":")[1].strip()
if current_device:
devices.append(current_device)
if only_connected:
devices = [device for device in devices if device["connected"] == "yes"]
if return_as_string:
devices = "; ".join([f"{device['name']} ({device['address']})" for device in devices])
return devices
else:
raise HomeAssistantError("Not implemented yet for Windows OS.")

View File

@ -1,5 +1,4 @@
fabric2~=3.1.0 wakeonlan~=3.1.0
paramiko~=3.2.0 homeassistant
voluptuous~=0.13.1 paramiko~=3.5.0
wakeonlan~=3.0.0 voluptuous~=0.15.2
homeassistant~=2023.7.2