Compare commits

..

No commits in common. "f82826fc0a82d676c80f14d7ccfbcaa60a469ba8" and "59ff04775c1e92d15b0260e4941e8b00091874d6" have entirely different histories.

3 changed files with 272 additions and 239 deletions

View File

@ -34,83 +34,94 @@ class Computer:
self._connection: SSHClient = SSHClient(host, username, password, port)
asyncio.create_task(self._connection.connect(computer=self))
# asyncio.create_task(self.update())
async def update(self, state: Optional[bool] = True, timeout: Optional[int] = 2) -> None:
"""Update computer details."""
if not state or not await self.is_on():
LOGGER.debug("Computer is off, skipping update")
return
# Ensure connection is established before updating
await self._ensure_connection_alive(timeout)
# Update tasks
await asyncio.gather(
self._update_operating_system(),
self._update_operating_system_version(),
self._update_desktop_environment(),
self._update_windows_entry_grub(),
self._update_monitors_config(),
self._update_audio_config(),
self._update_bluetooth_devices()
)
async def _ensure_connection_alive(self, timeout: int) -> None:
"""Ensure the SSH connection is alive, reconnect if necessary."""
for _ in range(timeout * 4):
if self._connection.is_connection_alive():
return
await asyncio.sleep(0.25)
if not self._connection.is_connection_alive():
LOGGER.debug(f"Reconnecting to {self.host}")
await self._connection.connect()
if not self._connection.is_connection_alive():
LOGGER.debug(f"Failed to connect to {self.host} after timeout={timeout}s")
raise ConnectionError("SSH connection could not be re-established")
async def _update_operating_system(self) -> None:
"""Update the operating system information."""
async def update_operating_system():
self.operating_system = await self._detect_operating_system()
async def _update_operating_system_version(self) -> None:
"""Update the operating system version."""
async def update_operating_system_version():
self.operating_system_version = (await self.run_action("operating_system_version")).output
async def _update_desktop_environment(self) -> None:
"""Update the desktop environment information."""
async def update_desktop_environment():
self.desktop_environment = (await self.run_action("desktop_environment")).output.lower()
async def _update_windows_entry_grub(self) -> None:
"""Update Windows entry in GRUB (if applicable)."""
async def update_windows_entry_grub():
self.windows_entry_grub = (await self.run_action("get_windows_entry_grub")).output
async def _update_monitors_config(self) -> None:
"""Update monitors configuration."""
async def update_monitors_config():
monitors_config = (await self.run_action("get_monitors_config")).output
if self.operating_system == OSType.LINUX:
output = (await self.run_action("get_monitors_config")).output
self.monitors_config = parse_gnome_monitors_output(output)
# TODO: Implement for Windows if needed
# TODO: add compatibility for KDE/others
self.monitors_config = parse_gnome_monitors_output(monitors_config)
elif self.operating_system == OSType.WINDOWS:
# TODO: implement for Windows
pass
async def _update_audio_config(self) -> None:
"""Update audio configuration."""
speakers_output = (await self.run_action("get_speakers")).output
microphones_output = (await self.run_action("get_microphones")).output
async def update_audio_config():
speakers_config = (await self.run_action("get_speakers")).output
microphones_config = (await self.run_action("get_microphones")).output
if self.operating_system == OSType.LINUX:
self.audio_config = parse_pactl_output(speakers_output, microphones_output)
# TODO: Implement for Windows
self.audio_config = parse_pactl_output(speakers_config, microphones_config)
elif self.operating_system == OSType.WINDOWS:
# TODO: implement for Windows
pass
async def _update_bluetooth_devices(self) -> None:
"""Update Bluetooth devices list."""
async def update_bluetooth_devices():
bluetooth_config = await self.run_action("get_bluetooth_devices")
if self.operating_system == OSType.LINUX:
self.bluetooth_devices = parse_bluetoothctl(await self.run_action("get_bluetooth_devices"))
# TODO: Implement for Windows
self.bluetooth_devices = parse_bluetoothctl(bluetooth_config)
elif self.operating_system == OSType.WINDOWS:
# TODO: implement for Windows
pass
if not state or not await self.is_on():
LOGGER.debug("ECM Computer is off, skipping update")
return
else:
# Wait for connection to be established before updating or give up after timeout
for i in range(timeout * 4):
if not self._connection.is_connection_alive():
await asyncio.sleep(0.25)
else:
break
else:
# Reconnect if connection is lost and init is already done
if self.initialized:
await self._connection.connect()
if not self._connection.is_connection_alive():
LOGGER.debug(
f"Computer seems ON but the SSH connection is dead (timeout={timeout}s), skipping update")
return
else:
LOGGER.debug(
f"Computer seems ON but the SSH connection is dead (timeout={timeout}s), skipping update")
return
await update_operating_system()
await update_operating_system_version()
await update_desktop_environment()
await update_windows_entry_grub()
await update_monitors_config()
await update_audio_config()
await update_bluetooth_devices()
async def _detect_operating_system(self) -> OSType:
"""Detect the operating system by running a uname command."""
result = await self.run_manually("uname")
return OSType.LINUX if result.successful() else OSType.WINDOWS
"""Detect the operating system of the computer."""
uname_result = await self.run_manually("uname")
return OSType.LINUX if uname_result.successful() else OSType.WINDOWS
def is_windows(self) -> bool:
"""Check if the computer is running Windows."""
return self.operating_system == OSType.WINDOWS
def is_linux(self) -> bool:
"""Check if the computer is running Linux."""
return self.operating_system == OSType.LINUX
async def is_on(self, timeout: int = 1) -> bool:
"""Check if the computer is on by pinging it."""
@ -118,20 +129,20 @@ class Computer:
proc = await asyncio.create_subprocess_exec(
*ping_cmd,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
stderr=asyncio.subprocess.DEVNULL,
)
await proc.communicate()
return proc.returncode == 0
async def start(self) -> None:
"""Start the computer using Wake-on-LAN."""
"""Start the computer."""
send_magic_packet(self.mac)
async def shutdown(self) -> None:
"""Shutdown the computer."""
await self.run_action("shutdown")
async def restart(self, from_os: Optional[OSType] = None, to_os: Optional[OSType] = None) -> None:
async def restart(self, from_os: OSType = None, to_os: OSType = None) -> None:
"""Restart the computer."""
await self.run_action("restart")
@ -140,68 +151,88 @@ class Computer:
await self.run_action("sleep")
async def set_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
"""Set monitors configuration."""
if self.is_linux() and self.desktop_environment == 'gnome':
await self.run_action("set_monitors_config", params={"args": format_gnome_monitors_args(monitors_config)})
"""Change the monitors configuration."""
if self.is_linux():
# TODO: other DE support
if self.desktop_environment == 'gnome':
await self.run_action("set_monitors_config",
params={"args": format_gnome_monitors_args(monitors_config)})
async def set_audio_config(self, volume: Optional[int] = None, mute: Optional[bool] = None,
input_device: Optional[str] = None, output_device: Optional[str] = None) -> None:
"""Set audio configuration."""
if self.is_linux() and self.desktop_environment == 'gnome':
async def set_audio_config(self, volume: int | None = None, mute: bool | None = None,
input_device: str | None = None,
output_device: str | None = None) -> None:
"""Change the audio configuration."""
if self.is_linux():
# TODO: other DE support
if self.desktop_environment == 'gnome':
pactl_commands = format_pactl_commands(self.audio_config, volume, mute, input_device, output_device)
for command in pactl_commands:
await self.run_action("set_audio_config", params={"args": command})
async def install_nircmd(self) -> None:
"""Install NirCmd tool (Windows specific)."""
install_path = f"C:\\Users\\{self.username}\\AppData\\Local\\EasyComputerManager"
await self.run_action("install_nircmd", params={
"download_url": "https://www.nirsoft.net/utils/nircmd.zip",
"install_path": install_path
})
await self.run_action("install_nircmd", params={"download_url": "https://www.nirsoft.net/utils/nircmd.zip",
"install_path": f"C:\\Users\\{self.username}\\AppData\\Local\\EasyComputerManager"})
async def steam_big_picture(self, action: str) -> None:
"""Start, stop, or exit Steam Big Picture mode."""
"""Start, stop or exit Steam Big Picture mode."""
await self.run_action(f"{action}_steam_big_picture")
async def run_action(self, id: str, params: Optional[Dict[str, Any]] = None,
raise_on_error: bool = False) -> CommandOutput:
"""Run a predefined action via SSH."""
params = params or {}
action = const.ACTIONS.get(id)
if not action:
LOGGER.error(f"Action {id} not found.")
async def run_action(self, id: str, params=None, raise_on_error: bool = None) -> CommandOutput:
"""Run a predefined command via SSH."""
if params is None:
params = {}
if id not in const.ACTIONS:
return CommandOutput("", 1, "", "Action not found")
if not self.operating_system:
self.operating_system = await self._detect_operating_system()
action = const.ACTIONS[id]
os_commands = action.get(self.operating_system.lower())
if not os_commands:
raise ValueError(f"Action {id} not supported for OS: {self.operating_system}")
if self.operating_system.lower() in action:
raise_on_error = action.get("raise_on_error", raise_on_error)
commands = os_commands if isinstance(os_commands, list) else os_commands.get("commands",
[os_commands.get("command")])
required_params = []
if "params" in os_commands:
required_params = os_commands.get("params", [])
os_data = action.get(self.operating_system.lower())
if isinstance(os_data, list):
commands = os_data
req_params = []
elif isinstance(os_data, dict):
if "command" in os_data or "commands" in os_data:
commands = os_data.get("commands", [os_data.get("command")])
req_params = os_data.get("params", [])
raise_on_error = os_data.get("raise_on_error", raise_on_error)
elif self.desktop_environment in os_data:
commands = os_data.get(self.desktop_environment).get("commands", [
os_data.get(self.desktop_environment).get("command")])
req_params = os_data.get(self.desktop_environment).get("params", [])
raise_on_error = os_data.get(self.desktop_environment).get("raise_on_error", raise_on_error)
else:
raise ValueError(f"Action {id} not supported for DE: {self.desktop_environment}")
else:
raise ValueError(f"Action {id} misconfigured/bad format")
# Validate parameters
if sorted(required_params) != sorted(params.keys()):
if sorted(req_params) != sorted(params.keys()):
raise ValueError(f"Invalid/missing parameters for action: {id}")
command_result = None
for command in commands:
for param, value in params.items():
command = command.replace(f"%{param}%", str(value))
command = command.replace(f"%{param}%", value)
result = await self.run_manually(command)
if result.successful():
return result
elif raise_on_error:
command_result = await self.run_manually(command)
if command_result.successful():
LOGGER.debug(f"Command successful: {command}")
return command_result
else:
LOGGER.debug(f"Command failed (raise: {raise_on_error}) : {command}")
# LOGGER.debug(f"Output: {command_result.output}")
# LOGGER.debug(f"Error: {command_result.error}")
if raise_on_error:
raise ValueError(f"Command failed: {command}")
return result
return command_result
else:
raise ValueError(f"Action {id} not supported for OS: {self.operating_system}")
async def run_manually(self, command: str) -> CommandOutput:
"""Run a custom command manually via SSH."""

View File

@ -8,88 +8,71 @@ from custom_components.easy_computer_manager.computer import CommandOutput
class SSHClient:
def __init__(self, host: str, username: str, password: Optional[str] = None, port: int = 22):
def __init__(self, host, username, password, port):
self.host = host
self.username = username
self._password = password
self.port = port
self._connection: Optional[paramiko.SSHClient] = None
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
self.disconnect()
self._connection = None
async def connect(self, retried: bool = False, computer: Optional['Computer'] = None) -> None:
"""Open an SSH connection using Paramiko asynchronously."""
if self.is_connection_alive():
LOGGER.debug(f"Connection to {self.host} is already active.")
return
self.disconnect() # Ensure any previous connection is closed
self.disconnect()
loop = asyncio.get_running_loop()
try:
# Create the SSH client
client = paramiko.SSHClient()
# Set missing host key policy to automatically accept unknown host keys
# client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# Offload the blocking connect call to a thread
await loop.run_in_executor(None, self._blocking_connect, client)
self._connection = client
LOGGER.debug(f"Connected to {self.host}")
except (OSError, paramiko.SSHException) as exc:
if retried:
await self.connect(retried=True)
else:
LOGGER.debug(f"Failed to connect to {self.host}: {exc}")
if not retried:
LOGGER.debug(f"Retrying connection to {self.host}...")
await self.connect(retried=True) # Retry only once
finally:
if computer is not None and hasattr(computer, "initialized"):
if computer is not None:
if hasattr(computer, "initialized"):
computer.initialized = True
def disconnect(self) -> None:
"""Close the SSH connection."""
if self._connection:
if self._connection is not None:
self._connection.close()
LOGGER.debug(f"Disconnected from {self.host}")
self._connection = None
def _blocking_connect(self, client: paramiko.SSHClient):
def _blocking_connect(self, client):
"""Perform the blocking SSH connection using Paramiko."""
client.connect(
hostname=self.host,
self.host,
username=self.username,
password=self._password,
port=self.port,
look_for_keys=False, # Set this to True if using private keys
allow_agent=False
port=self.port
)
async def execute_command(self, command: str) -> CommandOutput:
"""Execute a command on the SSH server asynchronously."""
if not self.is_connection_alive():
LOGGER.debug(f"Connection to {self.host} is not alive. Reconnecting...")
await self.connect()
try:
# Offload command execution to avoid blocking
loop = asyncio.get_running_loop()
stdin, stdout, stderr = await loop.run_in_executor(None, self._connection.exec_command, command)
stdin, stdout, stderr = self._connection.exec_command(command)
exit_status = stdout.channel.recv_exit_status()
return CommandOutput(command, exit_status, stdout.read().decode(), stderr.read().decode())
except (paramiko.SSHException, EOFError) as exc:
LOGGER.error(f"Failed to execute command on {self.host}: {exc}")
LOGGER.debug(f"Failed to execute command on {self.host}: {exc}")
return CommandOutput(command, -1, "", "")
def is_connection_alive(self) -> bool:
"""Check if the SSH connection is still alive."""
"""Check if the connection is still alive asynchronously."""
# use the code below if is_active() returns True
if self._connection is None:
return False
@ -100,5 +83,5 @@ class SSHClient:
self._connection.exec_command('ls', timeout=1)
return True
except Exception as e:
except Exception:
return False

View File

@ -1,53 +1,74 @@
# Some snippets of code are from the official wake_on_lan integration (inspiration for this custom component)
from __future__ import annotations
import asyncio
from typing import Any, Dict
from typing import Any
import voluptuous as vol
from homeassistant.components.switch import SwitchEntity
from homeassistant.components.switch import (SwitchEntity)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_HOST, CONF_MAC, CONF_NAME, CONF_PASSWORD, CONF_PORT, CONF_USERNAME,
)
CONF_HOST,
CONF_MAC,
CONF_NAME,
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME, )
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse
from homeassistant.helpers import entity_platform, device_registry as dr
from homeassistant.helpers import (
device_registry as dr,
entity_platform,
)
from homeassistant.helpers.config_validation import make_entity_service_schema
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .computer import OSType, Computer
from .computer.utils import format_debug_information, get_bluetooth_devices_as_str
from .const import (
DOMAIN, SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP,
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER,
SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, SERVICE_CHANGE_MONITORS_CONFIG,
SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO
)
from .const import SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP, \
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \
SERVICE_CHANGE_MONITORS_CONFIG, SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO, DOMAIN
async def async_setup_entry(
hass: HomeAssistant,
config: ConfigEntry,
async_add_entities: AddEntitiesCallback
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the computer switch from a config entry."""
mac_address = config.data[CONF_MAC]
host = config.data[CONF_HOST]
name = config.data[CONF_NAME]
dualboot = config.data.get("dualboot", False)
username = config.data[CONF_USERNAME]
password = config.data[CONF_PASSWORD]
port = config.data.get(CONF_PORT)
# Retrieve the data from the config flow
mac_address: str = config.data.get(CONF_MAC)
# broadcast_address: str | None = config.data.get(CONF_BROADCAST_ADDRESS)
# broadcast_port: int | None = config.data.get(CONF_BROADCAST_PORT)
host: str = config.data.get(CONF_HOST)
name: str = config.data.get(CONF_NAME)
dualboot: bool = config.data.get("dualboot")
username: str = config.data.get(CONF_USERNAME)
password: str = config.data.get(CONF_PASSWORD)
port: int | None = config.data.get(CONF_PORT)
# Register the computer switch
async_add_entities(
[ComputerSwitch(hass, name, host, mac_address, dualboot, username, password, port)],
True
[
ComputerSwitch(
hass,
name,
host,
mac_address,
dualboot,
username,
password,
port,
),
],
host is not None,
)
platform = entity_platform.async_get_current_platform()
# Service registrations
# Synthax : (service_name: str, schema: dict, supports_response: SupportsResponse)
services = [
(SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE),
(SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE),
(SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, {}, SupportsResponse.NONE),
(SERVICE_PUT_COMPUTER_TO_SLEEP, {}, SupportsResponse.NONE),
@ -64,18 +85,18 @@ async def async_setup_entry(
(SERVICE_DEBUG_INFO, {}, SupportsResponse.ONLY),
]
# Register services with their schemas
for service_name, schema, supports_response in services:
# Register the services that depends on the switch
for service in services:
platform.async_register_entity_service(
service_name,
make_entity_service_schema(schema),
service_name,
supports_response=supports_response
service[0],
make_entity_service_schema(service[1]),
service[0],
supports_response=service[2]
)
class ComputerSwitch(SwitchEntity):
"""Representation of a computer switch entity."""
"""Representation of a computer switch."""
def __init__(
self,
@ -83,24 +104,28 @@ class ComputerSwitch(SwitchEntity):
name: str,
host: str | None,
mac_address: str,
dualboot: bool,
dualboot: bool | False,
username: str,
password: str,
port: int | None,
) -> None:
"""Initialize the computer switch entity."""
self.hass = hass
"""Initialize the WOL switch."""
self._hass = hass
self._attr_name = name
self._attr_unique_id = dr.format_mac(mac_address)
self._state = False
self._attr_should_poll = not self._attr_assumed_state
self._attr_extra_state_attributes = {}
self.computer = Computer(host, mac_address, username, password, port, dualboot)
self._state = False
self._attr_assumed_state = host is None
self._attr_should_poll = bool(not self._attr_assumed_state)
self._attr_unique_id = dr.format_mac(mac_address)
self._attr_extra_state_attributes = {}
@property
def device_info(self) -> DeviceInfo:
"""Return device info for the registry."""
def device_info(self) -> DeviceInfo | None:
"""Return the device information."""
return DeviceInfo(
identifiers={(DOMAIN, self.computer.mac)},
name=self._attr_name,
@ -116,19 +141,16 @@ class ComputerSwitch(SwitchEntity):
@property
def is_on(self) -> bool:
"""Return true if the computer is on."""
return self._state
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the computer on using Wake-on-LAN."""
async def turn_on(self, **kwargs: Any) -> None:
await self.computer.start()
if self._attr_assumed_state:
self._state = True
self.async_write_ha_state()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the computer off via shutdown command."""
async def turn_off(self, **kwargs: Any) -> None:
await self.computer.shutdown()
if self._attr_assumed_state:
@ -136,16 +158,13 @@ class ComputerSwitch(SwitchEntity):
self.async_write_ha_state()
async def async_update(self) -> None:
"""Update the state by checking if the computer is on."""
is_on = await self.computer.is_on()
if self.is_on != is_on:
self._state = is_on
# self.async_write_ha_state()
"""Ping the computer to see if it is online and update the state."""
self._state = await self.computer.is_on()
# If the computer is on, update its attributes
if is_on:
await self.computer.update(is_on)
await self.computer.update(self._state)
# Update the state attributes and the connection only if the computer is on
if self._state:
self._attr_extra_state_attributes = {
"operating_system": self.computer.operating_system,
"operating_system_version": self.computer.operating_system_version,
@ -154,50 +173,50 @@ class ComputerSwitch(SwitchEntity):
"connected_devices": get_bluetooth_devices_as_str(self.computer),
}
# Service methods for various functionalities
# Services
async def restart_to_windows_from_linux(self) -> None:
"""Restart the computer from Linux to Windows."""
"""(Service Handler) Restart the computer to Windows from a running Linux by setting grub-reboot and restarting."""
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
async def restart_to_linux_from_windows(self) -> None:
"""Restart the computer from Windows to Linux."""
await self.computer.restart(OSType.WINDOWS, OSType.LINUX)
"""(Service Handler) Restart the computer to Linux from a running Windows by setting grub-reboot and restarting."""
await self.computer.restart()
async def put_computer_to_sleep(self) -> None:
"""Put the computer to sleep."""
"""(Service Handler) Put the computer to sleep."""
await self.computer.put_to_sleep()
async def start_computer_to_windows(self) -> None:
"""Start the computer to Windows after booting into Linux first."""
await self.computer.start()
"""(Service Handler) Start the computer to Windows"""
async def wait_and_reboot() -> None:
"""Wait until the computer is on, then restart to Windows."""
while not await self.computer.is_on():
async def wait_task():
while not self.is_on:
await asyncio.sleep(3)
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
self.hass.loop.create_task(wait_and_reboot())
"""Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart."""
await self.computer.start()
self._hass.loop.create_task(wait_task())
async def restart_computer(self) -> None:
"""Restart the computer."""
"""(Service Handler) Restart the computer."""
await self.computer.restart()
async def change_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
"""Change the monitor configuration."""
async def change_monitors_config(self, monitors_config: dict | None = None) -> None:
"""(Service Handler) Change the monitors configuration using a YAML config file."""
await self.computer.set_monitors_config(monitors_config)
async def steam_big_picture(self, action: str) -> None:
"""Control Steam Big Picture mode."""
"""(Service Handler) Control Steam Big Picture mode."""
await self.computer.steam_big_picture(action)
async def change_audio_config(
self, volume: int | None = None, mute: bool | None = None,
input_device: str | None = None, output_device: str | None = None
) -> None:
"""Change the audio configuration."""
async def change_audio_config(self, volume: int | None = None, mute: bool | None = None,
input_device: str | None = None,
output_device: str | None = None) -> None:
"""(Service Handler) Change the audio configuration using a YAML config file."""
await self.computer.set_audio_config(volume, mute, input_device, output_device)
async def debug_info(self) -> ServiceResponse:
"""Return debug information."""
"""(Service Handler) Prints debug info."""
return await format_debug_information(self.computer)