diff --git a/custom_components/easy_computer_manager/computer/__init__.py b/custom_components/easy_computer_manager/computer/__init__.py index d8a3ea9..d61075d 100644 --- a/custom_components/easy_computer_manager/computer/__init__.py +++ b/custom_components/easy_computer_manager/computer/__init__.py @@ -34,94 +34,83 @@ class Computer: self._connection: SSHClient = SSHClient(host, username, password, port) asyncio.create_task(self._connection.connect(computer=self)) - # asyncio.create_task(self.update()) async def update(self, state: Optional[bool] = True, timeout: Optional[int] = 2) -> None: """Update computer details.""" - - async def update_operating_system(): - self.operating_system = await self._detect_operating_system() - - async def update_operating_system_version(): - self.operating_system_version = (await self.run_action("operating_system_version")).output - - async def update_desktop_environment(): - self.desktop_environment = (await self.run_action("desktop_environment")).output.lower() - - async def update_windows_entry_grub(): - self.windows_entry_grub = (await self.run_action("get_windows_entry_grub")).output - - async def update_monitors_config(): - monitors_config = (await self.run_action("get_monitors_config")).output - if self.operating_system == OSType.LINUX: - # TODO: add compatibility for KDE/others - self.monitors_config = parse_gnome_monitors_output(monitors_config) - elif self.operating_system == OSType.WINDOWS: - # TODO: implement for Windows - pass - - async def update_audio_config(): - speakers_config = (await self.run_action("get_speakers")).output - microphones_config = (await self.run_action("get_microphones")).output - - if self.operating_system == OSType.LINUX: - self.audio_config = parse_pactl_output(speakers_config, microphones_config) - elif self.operating_system == OSType.WINDOWS: - # TODO: implement for Windows - pass - - async def update_bluetooth_devices(): - bluetooth_config = await self.run_action("get_bluetooth_devices") - if self.operating_system == OSType.LINUX: - self.bluetooth_devices = parse_bluetoothctl(bluetooth_config) - elif self.operating_system == OSType.WINDOWS: - # TODO: implement for Windows - pass - if not state or not await self.is_on(): - LOGGER.debug("ECM Computer is off, skipping update") + LOGGER.debug("Computer is off, skipping update") return - else: - # Wait for connection to be established before updating or give up after timeout - for i in range(timeout * 4): - if not self._connection.is_connection_alive(): - await asyncio.sleep(0.25) - else: - break - else: - # Reconnect if connection is lost and init is already done - if self.initialized: - await self._connection.connect() - if not self._connection.is_connection_alive(): - LOGGER.debug( - f"Computer seems ON but the SSH connection is dead (timeout={timeout}s), skipping update") - return - else: - LOGGER.debug( - f"Computer seems ON but the SSH connection is dead (timeout={timeout}s), skipping update") - return + # Ensure connection is established before updating + await self._ensure_connection_alive(timeout) - await update_operating_system() - await update_operating_system_version() - await update_desktop_environment() - await update_windows_entry_grub() - await update_monitors_config() - await update_audio_config() - await update_bluetooth_devices() + # Update tasks + await asyncio.gather( + self._update_operating_system(), + self._update_operating_system_version(), + self._update_desktop_environment(), + self._update_windows_entry_grub(), + self._update_monitors_config(), + self._update_audio_config(), + self._update_bluetooth_devices() + ) + + async def _ensure_connection_alive(self, timeout: int) -> None: + """Ensure the SSH connection is alive, reconnect if necessary.""" + for _ in range(timeout * 4): + if self._connection.is_connection_alive(): + return + await asyncio.sleep(0.25) + + if not self._connection.is_connection_alive(): + LOGGER.debug(f"Reconnecting to {self.host}") + await self._connection.connect() + if not self._connection.is_connection_alive(): + LOGGER.debug(f"Failed to connect to {self.host} after timeout={timeout}s") + raise ConnectionError("SSH connection could not be re-established") + + async def _update_operating_system(self) -> None: + """Update the operating system information.""" + self.operating_system = await self._detect_operating_system() + + async def _update_operating_system_version(self) -> None: + """Update the operating system version.""" + self.operating_system_version = (await self.run_action("operating_system_version")).output + + async def _update_desktop_environment(self) -> None: + """Update the desktop environment information.""" + self.desktop_environment = (await self.run_action("desktop_environment")).output.lower() + + async def _update_windows_entry_grub(self) -> None: + """Update Windows entry in GRUB (if applicable).""" + self.windows_entry_grub = (await self.run_action("get_windows_entry_grub")).output + + async def _update_monitors_config(self) -> None: + """Update monitors configuration.""" + if self.operating_system == OSType.LINUX: + output = (await self.run_action("get_monitors_config")).output + self.monitors_config = parse_gnome_monitors_output(output) + # TODO: Implement for Windows if needed + + async def _update_audio_config(self) -> None: + """Update audio configuration.""" + speakers_output = (await self.run_action("get_speakers")).output + microphones_output = (await self.run_action("get_microphones")).output + + if self.operating_system == OSType.LINUX: + self.audio_config = parse_pactl_output(speakers_output, microphones_output) + # TODO: Implement for Windows + + async def _update_bluetooth_devices(self) -> None: + """Update Bluetooth devices list.""" + if self.operating_system == OSType.LINUX: + self.bluetooth_devices = parse_bluetoothctl(await self.run_action("get_bluetooth_devices")) + # TODO: Implement for Windows async def _detect_operating_system(self) -> OSType: - """Detect the operating system of the computer.""" - uname_result = await self.run_manually("uname") - return OSType.LINUX if uname_result.successful() else OSType.WINDOWS - - def is_windows(self) -> bool: - """Check if the computer is running Windows.""" - return self.operating_system == OSType.WINDOWS - - def is_linux(self) -> bool: - """Check if the computer is running Linux.""" - return self.operating_system == OSType.LINUX + """Detect the operating system by running a uname command.""" + result = await self.run_manually("uname") + return OSType.LINUX if result.successful() else OSType.WINDOWS async def is_on(self, timeout: int = 1) -> bool: """Check if the computer is on by pinging it.""" @@ -129,20 +118,20 @@ class Computer: proc = await asyncio.create_subprocess_exec( *ping_cmd, stdout=asyncio.subprocess.DEVNULL, - stderr=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL ) await proc.communicate() return proc.returncode == 0 async def start(self) -> None: - """Start the computer.""" + """Start the computer using Wake-on-LAN.""" send_magic_packet(self.mac) async def shutdown(self) -> None: """Shutdown the computer.""" await self.run_action("shutdown") - async def restart(self, from_os: OSType = None, to_os: OSType = None) -> None: + async def restart(self, from_os: Optional[OSType] = None, to_os: Optional[OSType] = None) -> None: """Restart the computer.""" await self.run_action("restart") @@ -151,89 +140,69 @@ class Computer: await self.run_action("sleep") async def set_monitors_config(self, monitors_config: Dict[str, Any]) -> None: - """Change the monitors configuration.""" - if self.is_linux(): - # TODO: other DE support - if self.desktop_environment == 'gnome': - await self.run_action("set_monitors_config", - params={"args": format_gnome_monitors_args(monitors_config)}) + """Set monitors configuration.""" + if self.is_linux() and self.desktop_environment == 'gnome': + await self.run_action("set_monitors_config", params={"args": format_gnome_monitors_args(monitors_config)}) - async def set_audio_config(self, volume: int | None = None, mute: bool | None = None, - input_device: str | None = None, - output_device: str | None = None) -> None: - """Change the audio configuration.""" - if self.is_linux(): - # TODO: other DE support - if self.desktop_environment == 'gnome': - pactl_commands = format_pactl_commands(self.audio_config, volume, mute, input_device, output_device) - for command in pactl_commands: - await self.run_action("set_audio_config", params={"args": command}) + async def set_audio_config(self, volume: Optional[int] = None, mute: Optional[bool] = None, + input_device: Optional[str] = None, output_device: Optional[str] = None) -> None: + """Set audio configuration.""" + if self.is_linux() and self.desktop_environment == 'gnome': + pactl_commands = format_pactl_commands(self.audio_config, volume, mute, input_device, output_device) + for command in pactl_commands: + await self.run_action("set_audio_config", params={"args": command}) async def install_nircmd(self) -> None: """Install NirCmd tool (Windows specific).""" - await self.run_action("install_nircmd", params={"download_url": "https://www.nirsoft.net/utils/nircmd.zip", - "install_path": f"C:\\Users\\{self.username}\\AppData\\Local\\EasyComputerManager"}) + install_path = f"C:\\Users\\{self.username}\\AppData\\Local\\EasyComputerManager" + await self.run_action("install_nircmd", params={ + "download_url": "https://www.nirsoft.net/utils/nircmd.zip", + "install_path": install_path + }) async def steam_big_picture(self, action: str) -> None: - """Start, stop or exit Steam Big Picture mode.""" + """Start, stop, or exit Steam Big Picture mode.""" await self.run_action(f"{action}_steam_big_picture") - async def run_action(self, id: str, params=None, raise_on_error: bool = None) -> CommandOutput: - """Run a predefined command via SSH.""" - if params is None: - params = {} - if id not in const.ACTIONS: + async def run_action(self, id: str, params: Optional[Dict[str, Any]] = None, + raise_on_error: bool = False) -> CommandOutput: + """Run a predefined action via SSH.""" + params = params or {} + + action = const.ACTIONS.get(id) + if not action: + LOGGER.error(f"Action {id} not found.") return CommandOutput("", 1, "", "Action not found") - action = const.ACTIONS[id] + if not self.operating_system: + self.operating_system = await self._detect_operating_system() - if self.operating_system.lower() in action: - raise_on_error = action.get("raise_on_error", raise_on_error) - - os_data = action.get(self.operating_system.lower()) - if isinstance(os_data, list): - commands = os_data - req_params = [] - elif isinstance(os_data, dict): - if "command" in os_data or "commands" in os_data: - commands = os_data.get("commands", [os_data.get("command")]) - req_params = os_data.get("params", []) - raise_on_error = os_data.get("raise_on_error", raise_on_error) - elif self.desktop_environment in os_data: - commands = os_data.get(self.desktop_environment).get("commands", [ - os_data.get(self.desktop_environment).get("command")]) - req_params = os_data.get(self.desktop_environment).get("params", []) - raise_on_error = os_data.get(self.desktop_environment).get("raise_on_error", raise_on_error) - else: - raise ValueError(f"Action {id} not supported for DE: {self.desktop_environment}") - else: - raise ValueError(f"Action {id} misconfigured/bad format") - - if sorted(req_params) != sorted(params.keys()): - raise ValueError(f"Invalid/missing parameters for action: {id}") - - command_result = None - for command in commands: - for param, value in params.items(): - command = command.replace(f"%{param}%", value) - - command_result = await self.run_manually(command) - if command_result.successful(): - LOGGER.debug(f"Command successful: {command}") - return command_result - else: - LOGGER.debug(f"Command failed (raise: {raise_on_error}) : {command}") - # LOGGER.debug(f"Output: {command_result.output}") - # LOGGER.debug(f"Error: {command_result.error}") - - if raise_on_error: - raise ValueError(f"Command failed: {command}") - - return command_result - - else: + os_commands = action.get(self.operating_system.lower()) + if not os_commands: raise ValueError(f"Action {id} not supported for OS: {self.operating_system}") + commands = os_commands if isinstance(os_commands, list) else os_commands.get("commands", + [os_commands.get("command")]) + required_params = [] + if "params" in os_commands: + required_params = os_commands.get("params", []) + + # Validate parameters + if sorted(required_params) != sorted(params.keys()): + raise ValueError(f"Invalid/missing parameters for action: {id}") + + for command in commands: + for param, value in params.items(): + command = command.replace(f"%{param}%", str(value)) + + result = await self.run_manually(command) + if result.successful(): + return result + elif raise_on_error: + raise ValueError(f"Command failed: {command}") + + return result + async def run_manually(self, command: str) -> CommandOutput: """Run a custom command manually via SSH.""" return await self._connection.execute_command(command) diff --git a/custom_components/easy_computer_manager/switch.py b/custom_components/easy_computer_manager/switch.py index 096c933..727a369 100644 --- a/custom_components/easy_computer_manager/switch.py +++ b/custom_components/easy_computer_manager/switch.py @@ -1,74 +1,53 @@ -# Some snippets of code are from the official wake_on_lan integration (inspiration for this custom component) - from __future__ import annotations import asyncio -from typing import Any +from typing import Any, Dict import voluptuous as vol -from homeassistant.components.switch import (SwitchEntity) +from homeassistant.components.switch import SwitchEntity from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( - CONF_HOST, - CONF_MAC, - CONF_NAME, - CONF_PASSWORD, - CONF_PORT, - CONF_USERNAME, ) -from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse -from homeassistant.helpers import ( - device_registry as dr, - entity_platform, + CONF_HOST, CONF_MAC, CONF_NAME, CONF_PASSWORD, CONF_PORT, CONF_USERNAME, ) +from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse +from homeassistant.helpers import entity_platform, device_registry as dr from homeassistant.helpers.config_validation import make_entity_service_schema from homeassistant.helpers.device_registry import DeviceInfo from homeassistant.helpers.entity_platform import AddEntitiesCallback from .computer import OSType, Computer from .computer.utils import format_debug_information, get_bluetooth_devices_as_str -from .const import SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP, \ - SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \ - SERVICE_CHANGE_MONITORS_CONFIG, SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO, DOMAIN +from .const import ( + DOMAIN, SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP, + SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, + SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, SERVICE_CHANGE_MONITORS_CONFIG, + SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO +) async def async_setup_entry( hass: HomeAssistant, config: ConfigEntry, - async_add_entities: AddEntitiesCallback, + async_add_entities: AddEntitiesCallback ) -> None: - # Retrieve the data from the config flow - mac_address: str = config.data.get(CONF_MAC) - # broadcast_address: str | None = config.data.get(CONF_BROADCAST_ADDRESS) - # broadcast_port: int | None = config.data.get(CONF_BROADCAST_PORT) - host: str = config.data.get(CONF_HOST) - name: str = config.data.get(CONF_NAME) - dualboot: bool = config.data.get("dualboot") - username: str = config.data.get(CONF_USERNAME) - password: str = config.data.get(CONF_PASSWORD) - port: int | None = config.data.get(CONF_PORT) + """Set up the computer switch from a config entry.""" + mac_address = config.data[CONF_MAC] + host = config.data[CONF_HOST] + name = config.data[CONF_NAME] + dualboot = config.data.get("dualboot", False) + username = config.data[CONF_USERNAME] + password = config.data[CONF_PASSWORD] + port = config.data.get(CONF_PORT) - # Register the computer switch async_add_entities( - [ - ComputerSwitch( - hass, - name, - host, - mac_address, - dualboot, - username, - password, - port, - ), - ], - host is not None, + [ComputerSwitch(hass, name, host, mac_address, dualboot, username, password, port)], + True ) platform = entity_platform.async_get_current_platform() - # Synthax : (service_name: str, schema: dict, supports_response: SupportsResponse) + # Service registrations services = [ - (SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE), (SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE), (SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, {}, SupportsResponse.NONE), (SERVICE_PUT_COMPUTER_TO_SLEEP, {}, SupportsResponse.NONE), @@ -85,18 +64,18 @@ async def async_setup_entry( (SERVICE_DEBUG_INFO, {}, SupportsResponse.ONLY), ] - # Register the services that depends on the switch - for service in services: + # Register services with their schemas + for service_name, schema, supports_response in services: platform.async_register_entity_service( - service[0], - make_entity_service_schema(service[1]), - service[0], - supports_response=service[2] + service_name, + make_entity_service_schema(schema), + service_name, + supports_response=supports_response ) class ComputerSwitch(SwitchEntity): - """Representation of a computer switch.""" + """Representation of a computer switch entity.""" def __init__( self, @@ -104,28 +83,24 @@ class ComputerSwitch(SwitchEntity): name: str, host: str | None, mac_address: str, - dualboot: bool | False, + dualboot: bool, username: str, password: str, port: int | None, ) -> None: - """Initialize the WOL switch.""" - - self._hass = hass + """Initialize the computer switch entity.""" + self.hass = hass self._attr_name = name + self._attr_unique_id = dr.format_mac(mac_address) + self._state = False + self._attr_should_poll = not self._attr_assumed_state + self._attr_extra_state_attributes = {} self.computer = Computer(host, mac_address, username, password, port, dualboot) - self._state = False - self._attr_assumed_state = host is None - self._attr_should_poll = bool(not self._attr_assumed_state) - self._attr_unique_id = dr.format_mac(mac_address) - self._attr_extra_state_attributes = {} - @property - def device_info(self) -> DeviceInfo | None: - """Return the device information.""" - + def device_info(self) -> DeviceInfo: + """Return device info for the registry.""" return DeviceInfo( identifiers={(DOMAIN, self.computer.mac)}, name=self._attr_name, @@ -141,16 +116,19 @@ class ComputerSwitch(SwitchEntity): @property def is_on(self) -> bool: + """Return true if the computer is on.""" return self._state - async def turn_on(self, **kwargs: Any) -> None: + async def async_turn_on(self, **kwargs: Any) -> None: + """Turn the computer on using Wake-on-LAN.""" await self.computer.start() if self._attr_assumed_state: self._state = True self.async_write_ha_state() - async def turn_off(self, **kwargs: Any) -> None: + async def async_turn_off(self, **kwargs: Any) -> None: + """Turn the computer off via shutdown command.""" await self.computer.shutdown() if self._attr_assumed_state: @@ -158,13 +136,16 @@ class ComputerSwitch(SwitchEntity): self.async_write_ha_state() async def async_update(self) -> None: - """Ping the computer to see if it is online and update the state.""" - self._state = await self.computer.is_on() + """Update the state by checking if the computer is on.""" + is_on = await self.computer.is_on() + if self.is_on != is_on: + self._state = is_on + # self.async_write_ha_state() - await self.computer.update(self._state) + # If the computer is on, update its attributes + if is_on: + await self.computer.update(is_on) - # Update the state attributes and the connection only if the computer is on - if self._state: self._attr_extra_state_attributes = { "operating_system": self.computer.operating_system, "operating_system_version": self.computer.operating_system_version, @@ -173,50 +154,50 @@ class ComputerSwitch(SwitchEntity): "connected_devices": get_bluetooth_devices_as_str(self.computer), } - # Services + # Service methods for various functionalities async def restart_to_windows_from_linux(self) -> None: - """(Service Handler) Restart the computer to Windows from a running Linux by setting grub-reboot and restarting.""" + """Restart the computer from Linux to Windows.""" await self.computer.restart(OSType.LINUX, OSType.WINDOWS) async def restart_to_linux_from_windows(self) -> None: - """(Service Handler) Restart the computer to Linux from a running Windows by setting grub-reboot and restarting.""" - await self.computer.restart() + """Restart the computer from Windows to Linux.""" + await self.computer.restart(OSType.WINDOWS, OSType.LINUX) async def put_computer_to_sleep(self) -> None: - """(Service Handler) Put the computer to sleep.""" + """Put the computer to sleep.""" await self.computer.put_to_sleep() async def start_computer_to_windows(self) -> None: - """(Service Handler) Start the computer to Windows""" + """Start the computer to Windows after booting into Linux first.""" + await self.computer.start() - async def wait_task(): - while not self.is_on: + async def wait_and_reboot() -> None: + """Wait until the computer is on, then restart to Windows.""" + while not await self.computer.is_on(): await asyncio.sleep(3) - await self.computer.restart(OSType.LINUX, OSType.WINDOWS) - """Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart.""" - await self.computer.start() - self._hass.loop.create_task(wait_task()) + self.hass.loop.create_task(wait_and_reboot()) async def restart_computer(self) -> None: - """(Service Handler) Restart the computer.""" + """Restart the computer.""" await self.computer.restart() - async def change_monitors_config(self, monitors_config: dict | None = None) -> None: - """(Service Handler) Change the monitors configuration using a YAML config file.""" + async def change_monitors_config(self, monitors_config: Dict[str, Any]) -> None: + """Change the monitor configuration.""" await self.computer.set_monitors_config(monitors_config) async def steam_big_picture(self, action: str) -> None: - """(Service Handler) Control Steam Big Picture mode.""" + """Control Steam Big Picture mode.""" await self.computer.steam_big_picture(action) - async def change_audio_config(self, volume: int | None = None, mute: bool | None = None, - input_device: str | None = None, - output_device: str | None = None) -> None: - """(Service Handler) Change the audio configuration using a YAML config file.""" + async def change_audio_config( + self, volume: int | None = None, mute: bool | None = None, + input_device: str | None = None, output_device: str | None = None + ) -> None: + """Change the audio configuration.""" await self.computer.set_audio_config(volume, mute, input_device, output_device) async def debug_info(self) -> ServiceResponse: - """(Service Handler) Prints debug info.""" + """Return debug information.""" return await format_debug_information(self.computer)