diff --git a/custom_components/easy_computer_manager/__init__.py b/custom_components/easy_computer_manager/__init__.py index 99bdf82..4bbe5a9 100644 --- a/custom_components/easy_computer_manager/__init__.py +++ b/custom_components/easy_computer_manager/__init__.py @@ -59,11 +59,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: schema=WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA, ) - hass.async_create_task( - hass.config_entries.async_forward_entry_setup( - entry, "switch" - ) - ) + await hass.config_entries.async_forward_entry_setups(entry, ["switch"]) return True diff --git a/custom_components/easy_computer_manager/computer.py b/custom_components/easy_computer_manager/computer.py new file mode 100644 index 0000000..f8a5039 --- /dev/null +++ b/custom_components/easy_computer_manager/computer.py @@ -0,0 +1,182 @@ +import subprocess as sp + +import paramiko +import wakeonlan + +from custom_components.easy_computer_manager import const, _LOGGER + + +class OSType: + WINDOWS = "Windows" + LINUX = "Linux" + MACOS = "MacOS" + + +class Computer: + def __init__(self, host: str, mac: str, username: str, password: str, port: int = 22, + dualboot: bool = False) -> None: + """Init computer.""" + self.host = host + self.mac = mac + self._username = username + self._password = password + self._port = port + self._dualboot = dualboot + + self._operating_system = None + self._operating_system_version = None + self._windows_entry_grub = None + self._monitors_config = None + self._audio_config = None + self._bluetooth_devices = None + + self.setup() + + async def _open_ssh_connection(self): + """Open an SSH connection.""" + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(self.host, port=self._port, username=self._username, password=self._password) + return client + + def _close_ssh_connection(self, client): + """Close the SSH connection.""" + if client: + client.close() + + def setup(self): + """Setup method that opens an SSH connection and keeps it open for subsequent setup commands.""" + client = self._open_ssh_connection() + + # TODO: run commands here + self._operating_system = OSType.LINUX if self.run_manually( + "uname").return_code == 0 else OSType.WINDOWS # TODO: improve this + self._operating_system_version = self.run_action("operating_system_version").output + self._windows_entry_grub = self.run_action("get_windows_entry_grub").output + self._monitors_config = {} + self._audio_config = {'speakers': None, 'microphones': None} + self._bluetooth_devices = {} + + self._close_ssh_connection(client) + + # Getters + def is_on(self, timeout: int = 1) -> bool: + """Check if the computer is on (ping).""" + ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self.host)] + status = sp.call(ping_cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL) + return status == 0 + + def get_operating_system(self) -> str: + """Get the operating system of the computer.""" + return self._operating_system + + def get_operating_system_version(self) -> str: + """Get the operating system version of the computer.""" + return self._operating_system_version + + def get_windows_entry_grub(self) -> str: + """Get the Windows entry in the GRUB configuration file.""" + return self._windows_entry_grub + + def get_monitors_config(self) -> str: + """Get the monitors configuration of the computer.""" + return self._monitors_config + + def get_speakers(self) -> str: + """Get the audio configuration of the computer.""" + return self._audio_config.speakers + + def get_microphones(self) -> str: + """Get the audio configuration of the computer.""" + return self._audio_config.microphones + + def get_bluetooth_devices(self, as_str: bool = False) -> str: + """Get the Bluetooth devices of the computer.""" + return self._bluetooth_devices + + def is_dualboot(self) -> bool: + """Check if the computer is dualboot.""" + return self._dualboot + + # Actions + def start(self) -> None: + """Start the computer.""" + wakeonlan.send_magic_packet(self.mac) + + def shutdown(self) -> None: + """Shutdown the computer.""" + self.run_action("shutdown") + + def restart(self, from_os: OSType = None, to_os: OSType = None) -> None: + """Restart the computer.""" + self.run_action("restart") + + def put_to_sleep(self) -> None: + """Put the computer to sleep.""" + self.run_action("sleep") + + def change_monitors_config(self, monitors_config: dict) -> None: + pass + + def change_audio_config(self, volume: int | None = None, mute: bool | None = None, input_device: str | None = None, + output_device: str | None = None) -> None: + pass + + def install_nircmd(self) -> None: + pass + + def start_steam_big_picture(self) -> None: + pass + + def stop_steam_big_picture(self) -> None: + pass + + def exit_steam_big_picture(self) -> None: + pass + + def run_action(self, command: str, params=None) -> {}: + """Run a command via SSH. Opens a new connection for each command.""" + if params is None: + params = {} + if command not in const.COMMANDS: + _LOGGER.error(f"Invalid command: {command}") + return + + command_template = const.COMMANDS[command] + + # Check if the command has the required parameters + if "params" in command_template: + if sorted(command_template.params) != sorted(params.keys()): + raise ValueError("Invalid parameters") + + # Check if the command is available for the operating system + match self._operating_system: + case OSType.WINDOWS: + command = command_template[OSType.WINDOWS] + case OSType.LINUX: + command = command_template[OSType.LINUX] + case _: + raise ValueError("Invalid operating system") + + # Replace the parameters in the command + for param in params: + command = command.replace(f"%{param}%", params[param]) + + # Open SSH connection, execute command, and close connection + client = self._open_ssh_connection() + stdin, stdout, stderr = client.exec_command(command) + print(stdout.read().decode()) # Print the command output for debugging + self._close_ssh_connection(client) + + return {"output": stdout.read().decode(), "error": stderr.read().decode(), + "return_code": stdout.channel.recv_exit_status()} + + def run_manually(self, command: str) -> {}: + """Run a command manually (not from predefined commands).""" + client = self._open_ssh_connection() + stdin, stdout, stderr = client.exec_command(command) + print(stdout.read().decode()) # Print the command output for debugging + self._close_ssh_connection(client) + + return {"output": stdout.read().decode(), "error": stderr.read().decode(), + "return_code": stdout.channel.recv_exit_status()} diff --git a/custom_components/easy_computer_manager/computer_utils.py b/custom_components/easy_computer_manager/computer_utils.py new file mode 100644 index 0000000..18bb0c4 --- /dev/null +++ b/custom_components/easy_computer_manager/computer_utils.py @@ -0,0 +1,31 @@ +from custom_components.easy_computer_manager.computer import Computer + + +def get_debug_info(computer: Computer): + """Return debug information about the host system.""" + + data = { + 'os': { + 'name': computer.get_operating_system(), + 'version': computer.get_operating_system_version(), + }, + 'connection':{ + 'host': computer.host, + 'mac': computer.mac, + 'username': computer._username, + 'port': computer._port, + 'dualboot': computer._dualboot, + 'is_on': computer.is_on() + }, + 'grub':{ + 'windows_entry': computer.get_windows_entry_grub() + }, + 'audio':{ + 'speakers': computer.get_speakers(), + 'microphones': computer.get_microphones() + }, + 'monitors': computer.get_monitors_config(), + 'bluetooth_devices': computer.get_bluetooth_devices() + } + + return data diff --git a/custom_components/easy_computer_manager/config_flow.py b/custom_components/easy_computer_manager/config_flow.py index 5f7b737..3821320 100644 --- a/custom_components/easy_computer_manager/config_flow.py +++ b/custom_components/easy_computer_manager/config_flow.py @@ -9,7 +9,7 @@ from homeassistant import config_entries, exceptions from homeassistant.core import HomeAssistant from paramiko.ssh_exception import AuthenticationException -from . import utils +from .computer import Computer from .const import DOMAIN _LOGGER = logging.getLogger(__name__) @@ -40,6 +40,8 @@ class Hub: self._name = host self._id = host.lower() + self.computer = Computer(host, "", username, password, port) + @property def hub_id(self) -> str: """ID for dummy.""" @@ -48,8 +50,10 @@ class Hub: async def test_connection(self) -> bool: """Test connectivity to the computer is OK.""" try: - return utils.test_connection( - utils.create_ssh_connection(self._host, self._username, self._password, self._port)) + # TODO: check if reachable + _LOGGER.info("Testing connection to %s", self._host) + return True + except AuthenticationException: return False @@ -63,6 +67,7 @@ async def validate_input(hass: HomeAssistant, data: dict) -> dict[str, Any]: hub = Hub(hass, data["host"], data["username"], data["password"], data["port"]) + _LOGGER.info("Validating configuration") if not await hub.test_connection(): raise CannotConnect diff --git a/custom_components/easy_computer_manager/const.py b/custom_components/easy_computer_manager/const.py index 5ef6847..40bd78b 100644 --- a/custom_components/easy_computer_manager/const.py +++ b/custom_components/easy_computer_manager/const.py @@ -11,3 +11,34 @@ SERVICE_CHANGE_MONITORS_CONFIG = "change_monitors_config" SERVICE_STEAM_BIG_PICTURE = "steam_big_picture" SERVICE_CHANGE_AUDIO_CONFIG = "change_audio_config" SERVICE_DEBUG_INFO = "debug_info" + + +COMMANDS = { + "operating_system": { + "linux": ["uname"] + }, + "operating_system_version": { + "windows": ['for /f "tokens=1 delims=|" %i in (\'wmic os get Name ^| findstr /B /C:"Microsoft"\') do @echo %i'], + "linux": ["awk -F'=' '/^NAME=|^VERSION=/{gsub(/\"/, \"\", $2); printf $2\" \"}\' /etc/os-release && echo", "lsb_release -a | awk '/Description/ {print $2, $3, $4}'"] + }, + "shutdown": { + "windows": ["shutdown /s /t 0", "wmic os where Primary=TRUE call Shutdown"], + "linux": ["sudo shutdown -h now", "sudo init 0", "sudo systemctl poweroff"] + }, + "restart": { + "windows": ["shutdown /r /t 0", "wmic os where Primary=TRUE call Reboot"], + "linux": ["sudo shutdown -r now", "sudo init 6", "sudo systemctl reboot"] + }, + "sleep": { + "windows": ["shutdown /h /t 0", "rundll32.exe powrprof.dll,SetSuspendState Sleep"], + "linux": ["sudo systemctl suspend", "sudo pm-suspend"] + }, + "get_windows_entry_grub": { + "linux": ["sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub/grub.cfg", + "sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub2/grub.cfg"] + }, + "set_grub_entry": { + "params": ["grub-entry"], + "linux": ["sudo grub-reboot %grub-entry%", "sudo grub2-reboot %grub-entry%"] + } +} \ No newline at end of file diff --git a/custom_components/easy_computer_manager/switch.py b/custom_components/easy_computer_manager/switch.py index 7fb5986..a5d7c84 100644 --- a/custom_components/easy_computer_manager/switch.py +++ b/custom_components/easy_computer_manager/switch.py @@ -4,11 +4,9 @@ from __future__ import annotations import asyncio import logging -import subprocess as sp from typing import Any import voluptuous as vol -import wakeonlan from homeassistant.components.switch import (SwitchEntity) from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( @@ -27,9 +25,9 @@ from homeassistant.helpers import ( from homeassistant.helpers.config_validation import make_entity_service_schema from homeassistant.helpers.device_registry import DeviceInfo from homeassistant.helpers.entity_platform import AddEntitiesCallback -from paramiko.ssh_exception import AuthenticationException -from . import utils +from . import utils, computer_utils +from .computer import Computer, OSType from .const import SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP, \ SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \ SERVICE_CHANGE_MONITORS_CONFIG, SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO, DOMAIN @@ -110,8 +108,6 @@ class ComputerSwitch(SwitchEntity): name: str, host: str | None, mac_address: str, - # broadcast_address: str | None, - # broadcast_port: int | None, dualboot: bool | False, username: str, password: str, @@ -121,205 +117,115 @@ class ComputerSwitch(SwitchEntity): self._hass = hass self._attr_name = name - self._host = host - self._mac_address = mac_address - # self._broadcast_address = broadcast_address - # self._broadcast_port = broadcast_port - self._dualboot = dualboot - self._username = username - self._password = password - self._port = port + + self.computer = Computer(host, mac_address, username, password, port, dualboot) + self._state = False self._attr_assumed_state = host is None self._attr_should_poll = bool(not self._attr_assumed_state) self._attr_unique_id = dr.format_mac(mac_address) self._attr_extra_state_attributes = {} - self._connection = utils.create_ssh_connection(self._host, self._username, self._password) @property def device_info(self) -> DeviceInfo | None: """Return the device information.""" - if self._host is None: - return None + # TODO: remove this? + # if self._host is None: + # return None return DeviceInfo( - identifiers={(DOMAIN, self._mac_address)}, + identifiers={(DOMAIN, self.computer.mac)}, name=self._attr_name, manufacturer="Generic", model="Computer", - sw_version=utils.get_operating_system_version( - self._connection) if self._state else "Unknown", - connections={(dr.CONNECTION_NETWORK_MAC, self._mac_address)}, + sw_version=self.computer.get_operating_system_version(), + connections={(dr.CONNECTION_NETWORK_MAC, self.computer.mac)}, ) @property def icon(self) -> str: - """Return the icon to use in the frontend, if any.""" return "mdi:monitor" if self._state else "mdi:monitor-off" @property def is_on(self) -> bool: - """Return true if the computer switch is on.""" return self._state def turn_on(self, **kwargs: Any) -> None: - """Turn the computer on using wake on lan.""" - service_kwargs: dict[str, Any] = {} - # if self._broadcast_address is not None: - # service_kwargs["ip_address"] = self._broadcast_address - # if self._broadcast_port is not None: - # service_kwargs["port"] = self._broadcast_port - - _LOGGER.debug( - "Send magic packet to mac %s (broadcast: %s, port: %s)", - self._mac_address, - # self._broadcast_address, - # self._broadcast_port, - ) - - wakeonlan.send_magic_packet(self._mac_address, **service_kwargs) + self.computer.start() if self._attr_assumed_state: self._state = True self.async_write_ha_state() def turn_off(self, **kwargs: Any) -> None: - """Turn the computer off using appropriate shutdown command based on running OS and/or distro.""" - utils.shutdown_system(self._connection) + self.computer.shutdown() if self._attr_assumed_state: self._state = False self.async_write_ha_state() + # Services def restart_to_windows_from_linux(self) -> None: """Restart the computer to Windows from a running Linux by setting grub-reboot and restarting.""" - - if self._dualboot: - utils.restart_to_windows_from_linux(self._connection) - else: - _LOGGER.error( - "The computer with the IP address %s is not running a dualboot system or hasn't been configured " - "correctly in the UI.", - self._host) + self.computer.restart(OSType.LINUX, OSType.WINDOWS) def restart_to_linux_from_windows(self) -> None: """Restart the computer to Linux from a running Windows by setting grub-reboot and restarting.""" - - if self._dualboot: - # TODO: check for default grub entry and adapt accordingly - utils.restart_system(self._connection) - else: - _LOGGER.error( - "The computer with the IP address %s is not running a dualboot system or hasn't been configured " - "correctly in the UI.", - self._host) + self.computer.restart() def put_computer_to_sleep(self) -> None: - """Put the computer to sleep using appropriate sleep command based on running OS and/or distro.""" - utils.sleep_system(self._connection) + self.computer.setup() def start_computer_to_windows(self) -> None: + async def wait_task(): + while not self.is_on: + pass + # await asyncio.sleep(3) + + self.computer.restart(OSType.LINUX, OSType.WINDOWS) + """Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart.""" - self.turn_on() - - if self._dualboot: - # Wait for the computer to boot using a dedicated thread to avoid blocking the main thread - self._hass.loop.create_task(self.service_restart_to_windows_from_linux()) - - else: - _LOGGER.error( - "The computer with the IP address %s is not running a dualboot system or hasn't been configured " - "correctly in the UI.", - self._host) - - async def service_restart_to_windows_from_linux(self) -> None: - """Method to be run in a separate thread to wait for the computer to boot and then reboot to Windows.""" - while not self.is_on: - await asyncio.sleep(3) - - await utils.restart_to_windows_from_linux(self._connection) + self.computer.start() + self._hass.loop.create_task(wait_task()) def restart_computer(self) -> None: - """Restart the computer using appropriate restart command based on running OS and/or distro.""" - - # TODO: check for default grub entry and adapt accordingly - if self._dualboot and not utils.is_unix_system(connection=self._connection): - utils.restart_system(self._connection) - - # Wait for the computer to boot using a dedicated thread to avoid blocking the main thread - self.restart_to_windows_from_linux() - else: - utils.restart_system(self._connection) + self.computer.restart() def change_monitors_config(self, monitors_config: dict | None = None) -> None: """Change the monitors configuration using a YAML config file.""" - if monitors_config is not None and len(monitors_config) > 0: - utils.change_monitors_config(self._connection, monitors_config) - else: - raise HomeAssistantError("The 'monitors_config' parameter must be a non-empty dictionary.") + self.computer.change_monitors_config(monitors_config) def steam_big_picture(self, action: str) -> None: - """Controls Steam Big Picture mode.""" + match action: + case "start": + self.computer.start_steam_big_picture() + case "stop": + self.computer.stop_steam_big_picture() + case "exit": + self.computer.exit_steam_big_picture() - if action is not None: - utils.steam_big_picture(self._connection, action) - else: - raise HomeAssistantError("The 'action' parameter must be specified.") def change_audio_config(self, volume: int | None = None, mute: bool | None = None, input_device: str | None = None, output_device: str | None = None) -> None: """Change the audio configuration using a YAML config file.""" - utils.change_audio_config(self._connection, volume, mute, input_device, output_device) - - def update(self) -> None: - """Ping the computer to see if it is online and update the state.""" - timeout = 1 - ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self._host)] - status = sp.call(ping_cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL) - self._state = not bool(status) - - # Update the state attributes and the connection only if the computer is on - if self._state: - if self._connection is None or not utils.test_connection(self._connection): - self.renew_ssh_connection() - - if not self._state: - return - - self._attr_extra_state_attributes = { - "operating_system": utils.get_operating_system(self._connection), - "operating_system_version": utils.get_operating_system_version(self._connection), - "mac_address": self._mac_address, - "ip_address": self._host, - "connected_devices": utils.get_bluetooth_devices(self._connection, only_connected=True, return_as_string=True), - } - - def renew_ssh_connection(self) -> None: - """Renew the SSH connection.""" - _LOGGER.info("Renewing SSH connection to %s using username %s", self._host, self._username) - - if self._connection is not None: - self._connection.close() - - try: - self._connection = utils.create_ssh_connection(self._host, self._username, self._password) - self._connection.open() - except AuthenticationException as error: - _LOGGER.error("Could not authenticate to %s using username %s: %s", self._host, self._username, error) - self._state = False - except Exception as error: - _LOGGER.error("Could not connect to %s using username %s: %s", self._host, self._username, error) - - # Check if the error is due to timeout - if "timed out" in str(error): - _LOGGER.warning( - "Computer at %s does not respond to the SSH request. Possible causes: might be offline, " - "the firewall is blocking the SSH port, or the SSH server is offline and/or misconfigured.", - self._host - ) - - self._state = False + self.computer.change_audio_config(volume, mute, input_device, output_device) def debug_info(self) -> ServiceResponse: """Prints debug info.""" - return utils.get_debug_info(self._connection) + return computer_utils.get_debug_info(self.computer) + + + def update(self) -> None: + """Ping the computer to see if it is online and update the state.""" + self._state = self.computer.is_on() + + # Update the state attributes and the connection only if the computer is on + if self._state: + self._attr_extra_state_attributes = { + "operating_system": self.computer.get_operating_system(), + "operating_system_version": self.computer.get_operating_system_version(), + "mac_address": self.computer.mac, + "ip_address": self.computer.host, + "connected_devices": self.computer.get_bluetooth_devices(as_str=True), + } + diff --git a/custom_components/easy_computer_manager/utils.py b/custom_components/easy_computer_manager/utils.py index e519835..a3b99cf 100644 --- a/custom_components/easy_computer_manager/utils.py +++ b/custom_components/easy_computer_manager/utils.py @@ -11,34 +11,6 @@ _LOGGER = logging.getLogger(__name__) # _LOGGER.setLevel(logging.DEBUG) -def create_ssh_connection(host: str, username: str, password: str, port=22): - """Create an SSH connection to a host using a username and password specified in the config flow.""" - conf = fabric2.Config() - conf.run.hide = True - conf.run.warn = True - conf.warn = True - conf.sudo.password = password - conf.password = password - - connection = Connection( - host=host, user=username, port=port, connect_timeout=3, connect_kwargs={"password": password}, - config=conf - ) - - _LOGGER.info("Successfully created SSH connection to %s using username %s", host, username) - - return connection - - -def test_connection(connection: Connection): - """Test the connection to the host by running a simple command.""" - try: - connection.run('ls') - return True - except Exception: - return False - - def is_unix_system(connection: Connection): """Return a boolean based on get_operating_system result.""" return get_operating_system(connection) == "Linux/Unix" @@ -237,7 +209,7 @@ def change_monitors_config(connection: Connection, monitors_config: dict): command = ' '.join(command_parts) _LOGGER.debug("Running command: %s", command) - result = connection.run(command) + result = connection.run_action(command) if result.return_code == 0: _LOGGER.info("Successfully changed monitors config on system running on %s.", connection.host) @@ -459,43 +431,6 @@ def change_audio_config(connection: Connection, volume: int, mute: bool, input_d raise HomeAssistantError("Not implemented yet for Windows OS.") -def get_debug_info(connection: Connection): - """Return debug information about the host system.""" - - data = {} - - data_os = { - 'name': get_operating_system(connection), - 'version': get_operating_system_version(connection), - 'is_unix': is_unix_system(connection) - } - - data_ssh = { - 'is_connected': connection.is_connected, - 'username': connection.user, - 'host': connection.host, - 'port': connection.port - } - - data_grub = { - 'windows_entry': get_windows_entry_in_grub(connection) - } - - data_audio = { - 'speakers': get_audio_config(connection).get('sinks'), - 'microphones': get_audio_config(connection).get('sources') - } - - data['os'] = data_os - data['ssh'] = data_ssh - data['grub'] = data_grub - data['audio'] = data_audio - data['monitors'] = get_monitors_config(connection) - data['bluetooth_devices'] = get_bluetooth_devices(connection) - - return data - - def get_bluetooth_devices(connection: Connection, only_connected: bool = False, return_as_string: bool = False) -> []: commands = { "unix": "bluetoothctl info",