Compare commits

...

2 Commits

Author SHA1 Message Date
f82826fc0a
improve structure and refactor 2024-10-19 12:38:26 +02:00
67df62ff91
improve ssh client class 2024-10-19 12:02:45 +02:00
3 changed files with 240 additions and 273 deletions

View File

@ -34,94 +34,83 @@ class Computer:
self._connection: SSHClient = SSHClient(host, username, password, port) self._connection: SSHClient = SSHClient(host, username, password, port)
asyncio.create_task(self._connection.connect(computer=self)) asyncio.create_task(self._connection.connect(computer=self))
# asyncio.create_task(self.update())
async def update(self, state: Optional[bool] = True, timeout: Optional[int] = 2) -> None: async def update(self, state: Optional[bool] = True, timeout: Optional[int] = 2) -> None:
"""Update computer details.""" """Update computer details."""
if not state or not await self.is_on():
LOGGER.debug("Computer is off, skipping update")
return
async def update_operating_system(): # Ensure connection is established before updating
await self._ensure_connection_alive(timeout)
# Update tasks
await asyncio.gather(
self._update_operating_system(),
self._update_operating_system_version(),
self._update_desktop_environment(),
self._update_windows_entry_grub(),
self._update_monitors_config(),
self._update_audio_config(),
self._update_bluetooth_devices()
)
async def _ensure_connection_alive(self, timeout: int) -> None:
"""Ensure the SSH connection is alive, reconnect if necessary."""
for _ in range(timeout * 4):
if self._connection.is_connection_alive():
return
await asyncio.sleep(0.25)
if not self._connection.is_connection_alive():
LOGGER.debug(f"Reconnecting to {self.host}")
await self._connection.connect()
if not self._connection.is_connection_alive():
LOGGER.debug(f"Failed to connect to {self.host} after timeout={timeout}s")
raise ConnectionError("SSH connection could not be re-established")
async def _update_operating_system(self) -> None:
"""Update the operating system information."""
self.operating_system = await self._detect_operating_system() self.operating_system = await self._detect_operating_system()
async def update_operating_system_version(): async def _update_operating_system_version(self) -> None:
"""Update the operating system version."""
self.operating_system_version = (await self.run_action("operating_system_version")).output self.operating_system_version = (await self.run_action("operating_system_version")).output
async def update_desktop_environment(): async def _update_desktop_environment(self) -> None:
"""Update the desktop environment information."""
self.desktop_environment = (await self.run_action("desktop_environment")).output.lower() self.desktop_environment = (await self.run_action("desktop_environment")).output.lower()
async def update_windows_entry_grub(): async def _update_windows_entry_grub(self) -> None:
"""Update Windows entry in GRUB (if applicable)."""
self.windows_entry_grub = (await self.run_action("get_windows_entry_grub")).output self.windows_entry_grub = (await self.run_action("get_windows_entry_grub")).output
async def update_monitors_config(): async def _update_monitors_config(self) -> None:
monitors_config = (await self.run_action("get_monitors_config")).output """Update monitors configuration."""
if self.operating_system == OSType.LINUX: if self.operating_system == OSType.LINUX:
# TODO: add compatibility for KDE/others output = (await self.run_action("get_monitors_config")).output
self.monitors_config = parse_gnome_monitors_output(monitors_config) self.monitors_config = parse_gnome_monitors_output(output)
elif self.operating_system == OSType.WINDOWS: # TODO: Implement for Windows if needed
# TODO: implement for Windows
pass
async def update_audio_config(): async def _update_audio_config(self) -> None:
speakers_config = (await self.run_action("get_speakers")).output """Update audio configuration."""
microphones_config = (await self.run_action("get_microphones")).output speakers_output = (await self.run_action("get_speakers")).output
microphones_output = (await self.run_action("get_microphones")).output
if self.operating_system == OSType.LINUX: if self.operating_system == OSType.LINUX:
self.audio_config = parse_pactl_output(speakers_config, microphones_config) self.audio_config = parse_pactl_output(speakers_output, microphones_output)
elif self.operating_system == OSType.WINDOWS: # TODO: Implement for Windows
# TODO: implement for Windows
pass
async def update_bluetooth_devices(): async def _update_bluetooth_devices(self) -> None:
bluetooth_config = await self.run_action("get_bluetooth_devices") """Update Bluetooth devices list."""
if self.operating_system == OSType.LINUX: if self.operating_system == OSType.LINUX:
self.bluetooth_devices = parse_bluetoothctl(bluetooth_config) self.bluetooth_devices = parse_bluetoothctl(await self.run_action("get_bluetooth_devices"))
elif self.operating_system == OSType.WINDOWS: # TODO: Implement for Windows
# TODO: implement for Windows
pass
if not state or not await self.is_on():
LOGGER.debug("ECM Computer is off, skipping update")
return
else:
# Wait for connection to be established before updating or give up after timeout
for i in range(timeout * 4):
if not self._connection.is_connection_alive():
await asyncio.sleep(0.25)
else:
break
else:
# Reconnect if connection is lost and init is already done
if self.initialized:
await self._connection.connect()
if not self._connection.is_connection_alive():
LOGGER.debug(
f"Computer seems ON but the SSH connection is dead (timeout={timeout}s), skipping update")
return
else:
LOGGER.debug(
f"Computer seems ON but the SSH connection is dead (timeout={timeout}s), skipping update")
return
await update_operating_system()
await update_operating_system_version()
await update_desktop_environment()
await update_windows_entry_grub()
await update_monitors_config()
await update_audio_config()
await update_bluetooth_devices()
async def _detect_operating_system(self) -> OSType: async def _detect_operating_system(self) -> OSType:
"""Detect the operating system of the computer.""" """Detect the operating system by running a uname command."""
uname_result = await self.run_manually("uname") result = await self.run_manually("uname")
return OSType.LINUX if uname_result.successful() else OSType.WINDOWS return OSType.LINUX if result.successful() else OSType.WINDOWS
def is_windows(self) -> bool:
"""Check if the computer is running Windows."""
return self.operating_system == OSType.WINDOWS
def is_linux(self) -> bool:
"""Check if the computer is running Linux."""
return self.operating_system == OSType.LINUX
async def is_on(self, timeout: int = 1) -> bool: async def is_on(self, timeout: int = 1) -> bool:
"""Check if the computer is on by pinging it.""" """Check if the computer is on by pinging it."""
@ -129,20 +118,20 @@ class Computer:
proc = await asyncio.create_subprocess_exec( proc = await asyncio.create_subprocess_exec(
*ping_cmd, *ping_cmd,
stdout=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL
) )
await proc.communicate() await proc.communicate()
return proc.returncode == 0 return proc.returncode == 0
async def start(self) -> None: async def start(self) -> None:
"""Start the computer.""" """Start the computer using Wake-on-LAN."""
send_magic_packet(self.mac) send_magic_packet(self.mac)
async def shutdown(self) -> None: async def shutdown(self) -> None:
"""Shutdown the computer.""" """Shutdown the computer."""
await self.run_action("shutdown") await self.run_action("shutdown")
async def restart(self, from_os: OSType = None, to_os: OSType = None) -> None: async def restart(self, from_os: Optional[OSType] = None, to_os: Optional[OSType] = None) -> None:
"""Restart the computer.""" """Restart the computer."""
await self.run_action("restart") await self.run_action("restart")
@ -151,88 +140,68 @@ class Computer:
await self.run_action("sleep") await self.run_action("sleep")
async def set_monitors_config(self, monitors_config: Dict[str, Any]) -> None: async def set_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
"""Change the monitors configuration.""" """Set monitors configuration."""
if self.is_linux(): if self.is_linux() and self.desktop_environment == 'gnome':
# TODO: other DE support await self.run_action("set_monitors_config", params={"args": format_gnome_monitors_args(monitors_config)})
if self.desktop_environment == 'gnome':
await self.run_action("set_monitors_config",
params={"args": format_gnome_monitors_args(monitors_config)})
async def set_audio_config(self, volume: int | None = None, mute: bool | None = None, async def set_audio_config(self, volume: Optional[int] = None, mute: Optional[bool] = None,
input_device: str | None = None, input_device: Optional[str] = None, output_device: Optional[str] = None) -> None:
output_device: str | None = None) -> None: """Set audio configuration."""
"""Change the audio configuration.""" if self.is_linux() and self.desktop_environment == 'gnome':
if self.is_linux():
# TODO: other DE support
if self.desktop_environment == 'gnome':
pactl_commands = format_pactl_commands(self.audio_config, volume, mute, input_device, output_device) pactl_commands = format_pactl_commands(self.audio_config, volume, mute, input_device, output_device)
for command in pactl_commands: for command in pactl_commands:
await self.run_action("set_audio_config", params={"args": command}) await self.run_action("set_audio_config", params={"args": command})
async def install_nircmd(self) -> None: async def install_nircmd(self) -> None:
"""Install NirCmd tool (Windows specific).""" """Install NirCmd tool (Windows specific)."""
await self.run_action("install_nircmd", params={"download_url": "https://www.nirsoft.net/utils/nircmd.zip", install_path = f"C:\\Users\\{self.username}\\AppData\\Local\\EasyComputerManager"
"install_path": f"C:\\Users\\{self.username}\\AppData\\Local\\EasyComputerManager"}) await self.run_action("install_nircmd", params={
"download_url": "https://www.nirsoft.net/utils/nircmd.zip",
"install_path": install_path
})
async def steam_big_picture(self, action: str) -> None: async def steam_big_picture(self, action: str) -> None:
"""Start, stop or exit Steam Big Picture mode.""" """Start, stop, or exit Steam Big Picture mode."""
await self.run_action(f"{action}_steam_big_picture") await self.run_action(f"{action}_steam_big_picture")
async def run_action(self, id: str, params=None, raise_on_error: bool = None) -> CommandOutput: async def run_action(self, id: str, params: Optional[Dict[str, Any]] = None,
"""Run a predefined command via SSH.""" raise_on_error: bool = False) -> CommandOutput:
if params is None: """Run a predefined action via SSH."""
params = {} params = params or {}
if id not in const.ACTIONS:
action = const.ACTIONS.get(id)
if not action:
LOGGER.error(f"Action {id} not found.")
return CommandOutput("", 1, "", "Action not found") return CommandOutput("", 1, "", "Action not found")
action = const.ACTIONS[id] if not self.operating_system:
self.operating_system = await self._detect_operating_system()
if self.operating_system.lower() in action: os_commands = action.get(self.operating_system.lower())
raise_on_error = action.get("raise_on_error", raise_on_error) if not os_commands:
raise ValueError(f"Action {id} not supported for OS: {self.operating_system}")
os_data = action.get(self.operating_system.lower()) commands = os_commands if isinstance(os_commands, list) else os_commands.get("commands",
if isinstance(os_data, list): [os_commands.get("command")])
commands = os_data required_params = []
req_params = [] if "params" in os_commands:
elif isinstance(os_data, dict): required_params = os_commands.get("params", [])
if "command" in os_data or "commands" in os_data:
commands = os_data.get("commands", [os_data.get("command")])
req_params = os_data.get("params", [])
raise_on_error = os_data.get("raise_on_error", raise_on_error)
elif self.desktop_environment in os_data:
commands = os_data.get(self.desktop_environment).get("commands", [
os_data.get(self.desktop_environment).get("command")])
req_params = os_data.get(self.desktop_environment).get("params", [])
raise_on_error = os_data.get(self.desktop_environment).get("raise_on_error", raise_on_error)
else:
raise ValueError(f"Action {id} not supported for DE: {self.desktop_environment}")
else:
raise ValueError(f"Action {id} misconfigured/bad format")
if sorted(req_params) != sorted(params.keys()): # Validate parameters
if sorted(required_params) != sorted(params.keys()):
raise ValueError(f"Invalid/missing parameters for action: {id}") raise ValueError(f"Invalid/missing parameters for action: {id}")
command_result = None
for command in commands: for command in commands:
for param, value in params.items(): for param, value in params.items():
command = command.replace(f"%{param}%", value) command = command.replace(f"%{param}%", str(value))
command_result = await self.run_manually(command) result = await self.run_manually(command)
if command_result.successful(): if result.successful():
LOGGER.debug(f"Command successful: {command}") return result
return command_result elif raise_on_error:
else:
LOGGER.debug(f"Command failed (raise: {raise_on_error}) : {command}")
# LOGGER.debug(f"Output: {command_result.output}")
# LOGGER.debug(f"Error: {command_result.error}")
if raise_on_error:
raise ValueError(f"Command failed: {command}") raise ValueError(f"Command failed: {command}")
return command_result return result
else:
raise ValueError(f"Action {id} not supported for OS: {self.operating_system}")
async def run_manually(self, command: str) -> CommandOutput: async def run_manually(self, command: str) -> CommandOutput:
"""Run a custom command manually via SSH.""" """Run a custom command manually via SSH."""

View File

@ -8,71 +8,88 @@ from custom_components.easy_computer_manager.computer import CommandOutput
class SSHClient: class SSHClient:
def __init__(self, host, username, password, port): def __init__(self, host: str, username: str, password: Optional[str] = None, port: int = 22):
self.host = host self.host = host
self.username = username self.username = username
self._password = password self._password = password
self.port = port self.port = port
self._connection = None self._connection: Optional[paramiko.SSHClient] = None
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
self.disconnect()
async def connect(self, retried: bool = False, computer: Optional['Computer'] = None) -> None: async def connect(self, retried: bool = False, computer: Optional['Computer'] = None) -> None:
"""Open an SSH connection using Paramiko asynchronously.""" """Open an SSH connection using Paramiko asynchronously."""
self.disconnect() if self.is_connection_alive():
LOGGER.debug(f"Connection to {self.host} is already active.")
return
self.disconnect() # Ensure any previous connection is closed
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
try:
# Create the SSH client
client = paramiko.SSHClient() client = paramiko.SSHClient()
# Set missing host key policy to automatically accept unknown host keys # Set missing host key policy to automatically accept unknown host keys
# client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# Offload the blocking connect call to a thread # Offload the blocking connect call to a thread
await loop.run_in_executor(None, self._blocking_connect, client) await loop.run_in_executor(None, self._blocking_connect, client)
self._connection = client self._connection = client
LOGGER.debug(f"Connected to {self.host}")
except (OSError, paramiko.SSHException) as exc: except (OSError, paramiko.SSHException) as exc:
if retried:
await self.connect(retried=True)
else:
LOGGER.debug(f"Failed to connect to {self.host}: {exc}") LOGGER.debug(f"Failed to connect to {self.host}: {exc}")
if not retried:
LOGGER.debug(f"Retrying connection to {self.host}...")
await self.connect(retried=True) # Retry only once
finally: finally:
if computer is not None: if computer is not None and hasattr(computer, "initialized"):
if hasattr(computer, "initialized"):
computer.initialized = True computer.initialized = True
def disconnect(self) -> None: def disconnect(self) -> None:
"""Close the SSH connection.""" """Close the SSH connection."""
if self._connection is not None: if self._connection:
self._connection.close() self._connection.close()
LOGGER.debug(f"Disconnected from {self.host}")
self._connection = None self._connection = None
def _blocking_connect(self, client): def _blocking_connect(self, client: paramiko.SSHClient):
"""Perform the blocking SSH connection using Paramiko.""" """Perform the blocking SSH connection using Paramiko."""
client.connect( client.connect(
self.host, hostname=self.host,
username=self.username, username=self.username,
password=self._password, password=self._password,
port=self.port port=self.port,
look_for_keys=False, # Set this to True if using private keys
allow_agent=False
) )
async def execute_command(self, command: str) -> CommandOutput: async def execute_command(self, command: str) -> CommandOutput:
"""Execute a command on the SSH server asynchronously.""" """Execute a command on the SSH server asynchronously."""
try: if not self.is_connection_alive():
stdin, stdout, stderr = self._connection.exec_command(command) LOGGER.debug(f"Connection to {self.host} is not alive. Reconnecting...")
exit_status = stdout.channel.recv_exit_status() await self.connect()
try:
# Offload command execution to avoid blocking
loop = asyncio.get_running_loop()
stdin, stdout, stderr = await loop.run_in_executor(None, self._connection.exec_command, command)
exit_status = stdout.channel.recv_exit_status()
return CommandOutput(command, exit_status, stdout.read().decode(), stderr.read().decode()) return CommandOutput(command, exit_status, stdout.read().decode(), stderr.read().decode())
except (paramiko.SSHException, EOFError) as exc: except (paramiko.SSHException, EOFError) as exc:
LOGGER.debug(f"Failed to execute command on {self.host}: {exc}") LOGGER.error(f"Failed to execute command on {self.host}: {exc}")
return CommandOutput(command, -1, "", "") return CommandOutput(command, -1, "", "")
def is_connection_alive(self) -> bool: def is_connection_alive(self) -> bool:
"""Check if the connection is still alive asynchronously.""" """Check if the SSH connection is still alive."""
# use the code below if is_active() returns True
if self._connection is None: if self._connection is None:
return False return False
@ -83,5 +100,5 @@ class SSHClient:
self._connection.exec_command('ls', timeout=1) self._connection.exec_command('ls', timeout=1)
return True return True
except Exception: except Exception as e:
return False return False

View File

@ -1,74 +1,53 @@
# Some snippets of code are from the official wake_on_lan integration (inspiration for this custom component)
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from typing import Any from typing import Any, Dict
import voluptuous as vol import voluptuous as vol
from homeassistant.components.switch import (SwitchEntity) from homeassistant.components.switch import SwitchEntity
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ( from homeassistant.const import (
CONF_HOST, CONF_HOST, CONF_MAC, CONF_NAME, CONF_PASSWORD, CONF_PORT, CONF_USERNAME,
CONF_MAC,
CONF_NAME,
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME, )
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse
from homeassistant.helpers import (
device_registry as dr,
entity_platform,
) )
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse
from homeassistant.helpers import entity_platform, device_registry as dr
from homeassistant.helpers.config_validation import make_entity_service_schema from homeassistant.helpers.config_validation import make_entity_service_schema
from homeassistant.helpers.device_registry import DeviceInfo from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .computer import OSType, Computer from .computer import OSType, Computer
from .computer.utils import format_debug_information, get_bluetooth_devices_as_str from .computer.utils import format_debug_information, get_bluetooth_devices_as_str
from .const import SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP, \ from .const import (
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \ DOMAIN, SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP,
SERVICE_CHANGE_MONITORS_CONFIG, SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO, DOMAIN SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER,
SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, SERVICE_CHANGE_MONITORS_CONFIG,
SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO
)
async def async_setup_entry( async def async_setup_entry(
hass: HomeAssistant, hass: HomeAssistant,
config: ConfigEntry, config: ConfigEntry,
async_add_entities: AddEntitiesCallback, async_add_entities: AddEntitiesCallback
) -> None: ) -> None:
# Retrieve the data from the config flow """Set up the computer switch from a config entry."""
mac_address: str = config.data.get(CONF_MAC) mac_address = config.data[CONF_MAC]
# broadcast_address: str | None = config.data.get(CONF_BROADCAST_ADDRESS) host = config.data[CONF_HOST]
# broadcast_port: int | None = config.data.get(CONF_BROADCAST_PORT) name = config.data[CONF_NAME]
host: str = config.data.get(CONF_HOST) dualboot = config.data.get("dualboot", False)
name: str = config.data.get(CONF_NAME) username = config.data[CONF_USERNAME]
dualboot: bool = config.data.get("dualboot") password = config.data[CONF_PASSWORD]
username: str = config.data.get(CONF_USERNAME) port = config.data.get(CONF_PORT)
password: str = config.data.get(CONF_PASSWORD)
port: int | None = config.data.get(CONF_PORT)
# Register the computer switch
async_add_entities( async_add_entities(
[ [ComputerSwitch(hass, name, host, mac_address, dualboot, username, password, port)],
ComputerSwitch( True
hass,
name,
host,
mac_address,
dualboot,
username,
password,
port,
),
],
host is not None,
) )
platform = entity_platform.async_get_current_platform() platform = entity_platform.async_get_current_platform()
# Synthax : (service_name: str, schema: dict, supports_response: SupportsResponse) # Service registrations
services = [ services = [
(SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE),
(SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE), (SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE),
(SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, {}, SupportsResponse.NONE), (SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, {}, SupportsResponse.NONE),
(SERVICE_PUT_COMPUTER_TO_SLEEP, {}, SupportsResponse.NONE), (SERVICE_PUT_COMPUTER_TO_SLEEP, {}, SupportsResponse.NONE),
@ -85,18 +64,18 @@ async def async_setup_entry(
(SERVICE_DEBUG_INFO, {}, SupportsResponse.ONLY), (SERVICE_DEBUG_INFO, {}, SupportsResponse.ONLY),
] ]
# Register the services that depends on the switch # Register services with their schemas
for service in services: for service_name, schema, supports_response in services:
platform.async_register_entity_service( platform.async_register_entity_service(
service[0], service_name,
make_entity_service_schema(service[1]), make_entity_service_schema(schema),
service[0], service_name,
supports_response=service[2] supports_response=supports_response
) )
class ComputerSwitch(SwitchEntity): class ComputerSwitch(SwitchEntity):
"""Representation of a computer switch.""" """Representation of a computer switch entity."""
def __init__( def __init__(
self, self,
@ -104,28 +83,24 @@ class ComputerSwitch(SwitchEntity):
name: str, name: str,
host: str | None, host: str | None,
mac_address: str, mac_address: str,
dualboot: bool | False, dualboot: bool,
username: str, username: str,
password: str, password: str,
port: int | None, port: int | None,
) -> None: ) -> None:
"""Initialize the WOL switch.""" """Initialize the computer switch entity."""
self.hass = hass
self._hass = hass
self._attr_name = name self._attr_name = name
self._attr_unique_id = dr.format_mac(mac_address)
self._state = False
self._attr_should_poll = not self._attr_assumed_state
self._attr_extra_state_attributes = {}
self.computer = Computer(host, mac_address, username, password, port, dualboot) self.computer = Computer(host, mac_address, username, password, port, dualboot)
self._state = False
self._attr_assumed_state = host is None
self._attr_should_poll = bool(not self._attr_assumed_state)
self._attr_unique_id = dr.format_mac(mac_address)
self._attr_extra_state_attributes = {}
@property @property
def device_info(self) -> DeviceInfo | None: def device_info(self) -> DeviceInfo:
"""Return the device information.""" """Return device info for the registry."""
return DeviceInfo( return DeviceInfo(
identifiers={(DOMAIN, self.computer.mac)}, identifiers={(DOMAIN, self.computer.mac)},
name=self._attr_name, name=self._attr_name,
@ -141,16 +116,19 @@ class ComputerSwitch(SwitchEntity):
@property @property
def is_on(self) -> bool: def is_on(self) -> bool:
"""Return true if the computer is on."""
return self._state return self._state
async def turn_on(self, **kwargs: Any) -> None: async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the computer on using Wake-on-LAN."""
await self.computer.start() await self.computer.start()
if self._attr_assumed_state: if self._attr_assumed_state:
self._state = True self._state = True
self.async_write_ha_state() self.async_write_ha_state()
async def turn_off(self, **kwargs: Any) -> None: async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the computer off via shutdown command."""
await self.computer.shutdown() await self.computer.shutdown()
if self._attr_assumed_state: if self._attr_assumed_state:
@ -158,13 +136,16 @@ class ComputerSwitch(SwitchEntity):
self.async_write_ha_state() self.async_write_ha_state()
async def async_update(self) -> None: async def async_update(self) -> None:
"""Ping the computer to see if it is online and update the state.""" """Update the state by checking if the computer is on."""
self._state = await self.computer.is_on() is_on = await self.computer.is_on()
if self.is_on != is_on:
self._state = is_on
# self.async_write_ha_state()
await self.computer.update(self._state) # If the computer is on, update its attributes
if is_on:
await self.computer.update(is_on)
# Update the state attributes and the connection only if the computer is on
if self._state:
self._attr_extra_state_attributes = { self._attr_extra_state_attributes = {
"operating_system": self.computer.operating_system, "operating_system": self.computer.operating_system,
"operating_system_version": self.computer.operating_system_version, "operating_system_version": self.computer.operating_system_version,
@ -173,50 +154,50 @@ class ComputerSwitch(SwitchEntity):
"connected_devices": get_bluetooth_devices_as_str(self.computer), "connected_devices": get_bluetooth_devices_as_str(self.computer),
} }
# Services # Service methods for various functionalities
async def restart_to_windows_from_linux(self) -> None: async def restart_to_windows_from_linux(self) -> None:
"""(Service Handler) Restart the computer to Windows from a running Linux by setting grub-reboot and restarting.""" """Restart the computer from Linux to Windows."""
await self.computer.restart(OSType.LINUX, OSType.WINDOWS) await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
async def restart_to_linux_from_windows(self) -> None: async def restart_to_linux_from_windows(self) -> None:
"""(Service Handler) Restart the computer to Linux from a running Windows by setting grub-reboot and restarting.""" """Restart the computer from Windows to Linux."""
await self.computer.restart() await self.computer.restart(OSType.WINDOWS, OSType.LINUX)
async def put_computer_to_sleep(self) -> None: async def put_computer_to_sleep(self) -> None:
"""(Service Handler) Put the computer to sleep.""" """Put the computer to sleep."""
await self.computer.put_to_sleep() await self.computer.put_to_sleep()
async def start_computer_to_windows(self) -> None: async def start_computer_to_windows(self) -> None:
"""(Service Handler) Start the computer to Windows""" """Start the computer to Windows after booting into Linux first."""
await self.computer.start()
async def wait_task(): async def wait_and_reboot() -> None:
while not self.is_on: """Wait until the computer is on, then restart to Windows."""
while not await self.computer.is_on():
await asyncio.sleep(3) await asyncio.sleep(3)
await self.computer.restart(OSType.LINUX, OSType.WINDOWS) await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
"""Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart.""" self.hass.loop.create_task(wait_and_reboot())
await self.computer.start()
self._hass.loop.create_task(wait_task())
async def restart_computer(self) -> None: async def restart_computer(self) -> None:
"""(Service Handler) Restart the computer.""" """Restart the computer."""
await self.computer.restart() await self.computer.restart()
async def change_monitors_config(self, monitors_config: dict | None = None) -> None: async def change_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
"""(Service Handler) Change the monitors configuration using a YAML config file.""" """Change the monitor configuration."""
await self.computer.set_monitors_config(monitors_config) await self.computer.set_monitors_config(monitors_config)
async def steam_big_picture(self, action: str) -> None: async def steam_big_picture(self, action: str) -> None:
"""(Service Handler) Control Steam Big Picture mode.""" """Control Steam Big Picture mode."""
await self.computer.steam_big_picture(action) await self.computer.steam_big_picture(action)
async def change_audio_config(self, volume: int | None = None, mute: bool | None = None, async def change_audio_config(
input_device: str | None = None, self, volume: int | None = None, mute: bool | None = None,
output_device: str | None = None) -> None: input_device: str | None = None, output_device: str | None = None
"""(Service Handler) Change the audio configuration using a YAML config file.""" ) -> None:
"""Change the audio configuration."""
await self.computer.set_audio_config(volume, mute, input_device, output_device) await self.computer.set_audio_config(volume, mute, input_device, output_device)
async def debug_info(self) -> ServiceResponse: async def debug_info(self) -> ServiceResponse:
"""(Service Handler) Prints debug info.""" """Return debug information."""
return await format_debug_information(self.computer) return await format_debug_information(self.computer)