Compare commits
2 Commits
59ff04775c
...
f82826fc0a
Author | SHA1 | Date | |
---|---|---|---|
f82826fc0a | |||
67df62ff91 |
@ -34,94 +34,83 @@ class Computer:
|
||||
|
||||
self._connection: SSHClient = SSHClient(host, username, password, port)
|
||||
asyncio.create_task(self._connection.connect(computer=self))
|
||||
# asyncio.create_task(self.update())
|
||||
|
||||
async def update(self, state: Optional[bool] = True, timeout: Optional[int] = 2) -> None:
|
||||
"""Update computer details."""
|
||||
if not state or not await self.is_on():
|
||||
LOGGER.debug("Computer is off, skipping update")
|
||||
return
|
||||
|
||||
async def update_operating_system():
|
||||
# Ensure connection is established before updating
|
||||
await self._ensure_connection_alive(timeout)
|
||||
|
||||
# Update tasks
|
||||
await asyncio.gather(
|
||||
self._update_operating_system(),
|
||||
self._update_operating_system_version(),
|
||||
self._update_desktop_environment(),
|
||||
self._update_windows_entry_grub(),
|
||||
self._update_monitors_config(),
|
||||
self._update_audio_config(),
|
||||
self._update_bluetooth_devices()
|
||||
)
|
||||
|
||||
async def _ensure_connection_alive(self, timeout: int) -> None:
|
||||
"""Ensure the SSH connection is alive, reconnect if necessary."""
|
||||
for _ in range(timeout * 4):
|
||||
if self._connection.is_connection_alive():
|
||||
return
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
if not self._connection.is_connection_alive():
|
||||
LOGGER.debug(f"Reconnecting to {self.host}")
|
||||
await self._connection.connect()
|
||||
if not self._connection.is_connection_alive():
|
||||
LOGGER.debug(f"Failed to connect to {self.host} after timeout={timeout}s")
|
||||
raise ConnectionError("SSH connection could not be re-established")
|
||||
|
||||
async def _update_operating_system(self) -> None:
|
||||
"""Update the operating system information."""
|
||||
self.operating_system = await self._detect_operating_system()
|
||||
|
||||
async def update_operating_system_version():
|
||||
async def _update_operating_system_version(self) -> None:
|
||||
"""Update the operating system version."""
|
||||
self.operating_system_version = (await self.run_action("operating_system_version")).output
|
||||
|
||||
async def update_desktop_environment():
|
||||
async def _update_desktop_environment(self) -> None:
|
||||
"""Update the desktop environment information."""
|
||||
self.desktop_environment = (await self.run_action("desktop_environment")).output.lower()
|
||||
|
||||
async def update_windows_entry_grub():
|
||||
async def _update_windows_entry_grub(self) -> None:
|
||||
"""Update Windows entry in GRUB (if applicable)."""
|
||||
self.windows_entry_grub = (await self.run_action("get_windows_entry_grub")).output
|
||||
|
||||
async def update_monitors_config():
|
||||
monitors_config = (await self.run_action("get_monitors_config")).output
|
||||
async def _update_monitors_config(self) -> None:
|
||||
"""Update monitors configuration."""
|
||||
if self.operating_system == OSType.LINUX:
|
||||
# TODO: add compatibility for KDE/others
|
||||
self.monitors_config = parse_gnome_monitors_output(monitors_config)
|
||||
elif self.operating_system == OSType.WINDOWS:
|
||||
# TODO: implement for Windows
|
||||
pass
|
||||
output = (await self.run_action("get_monitors_config")).output
|
||||
self.monitors_config = parse_gnome_monitors_output(output)
|
||||
# TODO: Implement for Windows if needed
|
||||
|
||||
async def update_audio_config():
|
||||
speakers_config = (await self.run_action("get_speakers")).output
|
||||
microphones_config = (await self.run_action("get_microphones")).output
|
||||
async def _update_audio_config(self) -> None:
|
||||
"""Update audio configuration."""
|
||||
speakers_output = (await self.run_action("get_speakers")).output
|
||||
microphones_output = (await self.run_action("get_microphones")).output
|
||||
|
||||
if self.operating_system == OSType.LINUX:
|
||||
self.audio_config = parse_pactl_output(speakers_config, microphones_config)
|
||||
elif self.operating_system == OSType.WINDOWS:
|
||||
# TODO: implement for Windows
|
||||
pass
|
||||
self.audio_config = parse_pactl_output(speakers_output, microphones_output)
|
||||
# TODO: Implement for Windows
|
||||
|
||||
async def update_bluetooth_devices():
|
||||
bluetooth_config = await self.run_action("get_bluetooth_devices")
|
||||
async def _update_bluetooth_devices(self) -> None:
|
||||
"""Update Bluetooth devices list."""
|
||||
if self.operating_system == OSType.LINUX:
|
||||
self.bluetooth_devices = parse_bluetoothctl(bluetooth_config)
|
||||
elif self.operating_system == OSType.WINDOWS:
|
||||
# TODO: implement for Windows
|
||||
pass
|
||||
|
||||
if not state or not await self.is_on():
|
||||
LOGGER.debug("ECM Computer is off, skipping update")
|
||||
return
|
||||
else:
|
||||
# Wait for connection to be established before updating or give up after timeout
|
||||
for i in range(timeout * 4):
|
||||
if not self._connection.is_connection_alive():
|
||||
await asyncio.sleep(0.25)
|
||||
else:
|
||||
break
|
||||
else:
|
||||
# Reconnect if connection is lost and init is already done
|
||||
if self.initialized:
|
||||
await self._connection.connect()
|
||||
|
||||
if not self._connection.is_connection_alive():
|
||||
LOGGER.debug(
|
||||
f"Computer seems ON but the SSH connection is dead (timeout={timeout}s), skipping update")
|
||||
return
|
||||
else:
|
||||
LOGGER.debug(
|
||||
f"Computer seems ON but the SSH connection is dead (timeout={timeout}s), skipping update")
|
||||
return
|
||||
|
||||
await update_operating_system()
|
||||
await update_operating_system_version()
|
||||
await update_desktop_environment()
|
||||
await update_windows_entry_grub()
|
||||
await update_monitors_config()
|
||||
await update_audio_config()
|
||||
await update_bluetooth_devices()
|
||||
self.bluetooth_devices = parse_bluetoothctl(await self.run_action("get_bluetooth_devices"))
|
||||
# TODO: Implement for Windows
|
||||
|
||||
async def _detect_operating_system(self) -> OSType:
|
||||
"""Detect the operating system of the computer."""
|
||||
uname_result = await self.run_manually("uname")
|
||||
return OSType.LINUX if uname_result.successful() else OSType.WINDOWS
|
||||
|
||||
def is_windows(self) -> bool:
|
||||
"""Check if the computer is running Windows."""
|
||||
return self.operating_system == OSType.WINDOWS
|
||||
|
||||
def is_linux(self) -> bool:
|
||||
"""Check if the computer is running Linux."""
|
||||
return self.operating_system == OSType.LINUX
|
||||
"""Detect the operating system by running a uname command."""
|
||||
result = await self.run_manually("uname")
|
||||
return OSType.LINUX if result.successful() else OSType.WINDOWS
|
||||
|
||||
async def is_on(self, timeout: int = 1) -> bool:
|
||||
"""Check if the computer is on by pinging it."""
|
||||
@ -129,20 +118,20 @@ class Computer:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*ping_cmd,
|
||||
stdout=asyncio.subprocess.DEVNULL,
|
||||
stderr=asyncio.subprocess.DEVNULL,
|
||||
stderr=asyncio.subprocess.DEVNULL
|
||||
)
|
||||
await proc.communicate()
|
||||
return proc.returncode == 0
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the computer."""
|
||||
"""Start the computer using Wake-on-LAN."""
|
||||
send_magic_packet(self.mac)
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
"""Shutdown the computer."""
|
||||
await self.run_action("shutdown")
|
||||
|
||||
async def restart(self, from_os: OSType = None, to_os: OSType = None) -> None:
|
||||
async def restart(self, from_os: Optional[OSType] = None, to_os: Optional[OSType] = None) -> None:
|
||||
"""Restart the computer."""
|
||||
await self.run_action("restart")
|
||||
|
||||
@ -151,88 +140,68 @@ class Computer:
|
||||
await self.run_action("sleep")
|
||||
|
||||
async def set_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
|
||||
"""Change the monitors configuration."""
|
||||
if self.is_linux():
|
||||
# TODO: other DE support
|
||||
if self.desktop_environment == 'gnome':
|
||||
await self.run_action("set_monitors_config",
|
||||
params={"args": format_gnome_monitors_args(monitors_config)})
|
||||
"""Set monitors configuration."""
|
||||
if self.is_linux() and self.desktop_environment == 'gnome':
|
||||
await self.run_action("set_monitors_config", params={"args": format_gnome_monitors_args(monitors_config)})
|
||||
|
||||
async def set_audio_config(self, volume: int | None = None, mute: bool | None = None,
|
||||
input_device: str | None = None,
|
||||
output_device: str | None = None) -> None:
|
||||
"""Change the audio configuration."""
|
||||
if self.is_linux():
|
||||
# TODO: other DE support
|
||||
if self.desktop_environment == 'gnome':
|
||||
async def set_audio_config(self, volume: Optional[int] = None, mute: Optional[bool] = None,
|
||||
input_device: Optional[str] = None, output_device: Optional[str] = None) -> None:
|
||||
"""Set audio configuration."""
|
||||
if self.is_linux() and self.desktop_environment == 'gnome':
|
||||
pactl_commands = format_pactl_commands(self.audio_config, volume, mute, input_device, output_device)
|
||||
for command in pactl_commands:
|
||||
await self.run_action("set_audio_config", params={"args": command})
|
||||
|
||||
async def install_nircmd(self) -> None:
|
||||
"""Install NirCmd tool (Windows specific)."""
|
||||
await self.run_action("install_nircmd", params={"download_url": "https://www.nirsoft.net/utils/nircmd.zip",
|
||||
"install_path": f"C:\\Users\\{self.username}\\AppData\\Local\\EasyComputerManager"})
|
||||
install_path = f"C:\\Users\\{self.username}\\AppData\\Local\\EasyComputerManager"
|
||||
await self.run_action("install_nircmd", params={
|
||||
"download_url": "https://www.nirsoft.net/utils/nircmd.zip",
|
||||
"install_path": install_path
|
||||
})
|
||||
|
||||
async def steam_big_picture(self, action: str) -> None:
|
||||
"""Start, stop or exit Steam Big Picture mode."""
|
||||
"""Start, stop, or exit Steam Big Picture mode."""
|
||||
await self.run_action(f"{action}_steam_big_picture")
|
||||
|
||||
async def run_action(self, id: str, params=None, raise_on_error: bool = None) -> CommandOutput:
|
||||
"""Run a predefined command via SSH."""
|
||||
if params is None:
|
||||
params = {}
|
||||
if id not in const.ACTIONS:
|
||||
async def run_action(self, id: str, params: Optional[Dict[str, Any]] = None,
|
||||
raise_on_error: bool = False) -> CommandOutput:
|
||||
"""Run a predefined action via SSH."""
|
||||
params = params or {}
|
||||
|
||||
action = const.ACTIONS.get(id)
|
||||
if not action:
|
||||
LOGGER.error(f"Action {id} not found.")
|
||||
return CommandOutput("", 1, "", "Action not found")
|
||||
|
||||
action = const.ACTIONS[id]
|
||||
if not self.operating_system:
|
||||
self.operating_system = await self._detect_operating_system()
|
||||
|
||||
if self.operating_system.lower() in action:
|
||||
raise_on_error = action.get("raise_on_error", raise_on_error)
|
||||
os_commands = action.get(self.operating_system.lower())
|
||||
if not os_commands:
|
||||
raise ValueError(f"Action {id} not supported for OS: {self.operating_system}")
|
||||
|
||||
os_data = action.get(self.operating_system.lower())
|
||||
if isinstance(os_data, list):
|
||||
commands = os_data
|
||||
req_params = []
|
||||
elif isinstance(os_data, dict):
|
||||
if "command" in os_data or "commands" in os_data:
|
||||
commands = os_data.get("commands", [os_data.get("command")])
|
||||
req_params = os_data.get("params", [])
|
||||
raise_on_error = os_data.get("raise_on_error", raise_on_error)
|
||||
elif self.desktop_environment in os_data:
|
||||
commands = os_data.get(self.desktop_environment).get("commands", [
|
||||
os_data.get(self.desktop_environment).get("command")])
|
||||
req_params = os_data.get(self.desktop_environment).get("params", [])
|
||||
raise_on_error = os_data.get(self.desktop_environment).get("raise_on_error", raise_on_error)
|
||||
else:
|
||||
raise ValueError(f"Action {id} not supported for DE: {self.desktop_environment}")
|
||||
else:
|
||||
raise ValueError(f"Action {id} misconfigured/bad format")
|
||||
commands = os_commands if isinstance(os_commands, list) else os_commands.get("commands",
|
||||
[os_commands.get("command")])
|
||||
required_params = []
|
||||
if "params" in os_commands:
|
||||
required_params = os_commands.get("params", [])
|
||||
|
||||
if sorted(req_params) != sorted(params.keys()):
|
||||
# Validate parameters
|
||||
if sorted(required_params) != sorted(params.keys()):
|
||||
raise ValueError(f"Invalid/missing parameters for action: {id}")
|
||||
|
||||
command_result = None
|
||||
for command in commands:
|
||||
for param, value in params.items():
|
||||
command = command.replace(f"%{param}%", value)
|
||||
command = command.replace(f"%{param}%", str(value))
|
||||
|
||||
command_result = await self.run_manually(command)
|
||||
if command_result.successful():
|
||||
LOGGER.debug(f"Command successful: {command}")
|
||||
return command_result
|
||||
else:
|
||||
LOGGER.debug(f"Command failed (raise: {raise_on_error}) : {command}")
|
||||
# LOGGER.debug(f"Output: {command_result.output}")
|
||||
# LOGGER.debug(f"Error: {command_result.error}")
|
||||
|
||||
if raise_on_error:
|
||||
result = await self.run_manually(command)
|
||||
if result.successful():
|
||||
return result
|
||||
elif raise_on_error:
|
||||
raise ValueError(f"Command failed: {command}")
|
||||
|
||||
return command_result
|
||||
|
||||
else:
|
||||
raise ValueError(f"Action {id} not supported for OS: {self.operating_system}")
|
||||
return result
|
||||
|
||||
async def run_manually(self, command: str) -> CommandOutput:
|
||||
"""Run a custom command manually via SSH."""
|
||||
|
@ -8,71 +8,88 @@ from custom_components.easy_computer_manager.computer import CommandOutput
|
||||
|
||||
|
||||
class SSHClient:
|
||||
def __init__(self, host, username, password, port):
|
||||
def __init__(self, host: str, username: str, password: Optional[str] = None, port: int = 22):
|
||||
self.host = host
|
||||
self.username = username
|
||||
self._password = password
|
||||
self.port = port
|
||||
self._connection = None
|
||||
self._connection: Optional[paramiko.SSHClient] = None
|
||||
|
||||
async def __aenter__(self):
|
||||
await self.connect()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_value, traceback):
|
||||
self.disconnect()
|
||||
|
||||
async def connect(self, retried: bool = False, computer: Optional['Computer'] = None) -> None:
|
||||
"""Open an SSH connection using Paramiko asynchronously."""
|
||||
self.disconnect()
|
||||
if self.is_connection_alive():
|
||||
LOGGER.debug(f"Connection to {self.host} is already active.")
|
||||
return
|
||||
|
||||
self.disconnect() # Ensure any previous connection is closed
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
try:
|
||||
# Create the SSH client
|
||||
client = paramiko.SSHClient()
|
||||
|
||||
# Set missing host key policy to automatically accept unknown host keys
|
||||
# client.load_system_host_keys()
|
||||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
|
||||
try:
|
||||
# Offload the blocking connect call to a thread
|
||||
await loop.run_in_executor(None, self._blocking_connect, client)
|
||||
self._connection = client
|
||||
LOGGER.debug(f"Connected to {self.host}")
|
||||
|
||||
except (OSError, paramiko.SSHException) as exc:
|
||||
if retried:
|
||||
await self.connect(retried=True)
|
||||
else:
|
||||
LOGGER.debug(f"Failed to connect to {self.host}: {exc}")
|
||||
if not retried:
|
||||
LOGGER.debug(f"Retrying connection to {self.host}...")
|
||||
await self.connect(retried=True) # Retry only once
|
||||
|
||||
finally:
|
||||
if computer is not None:
|
||||
if hasattr(computer, "initialized"):
|
||||
if computer is not None and hasattr(computer, "initialized"):
|
||||
computer.initialized = True
|
||||
|
||||
def disconnect(self) -> None:
|
||||
"""Close the SSH connection."""
|
||||
if self._connection is not None:
|
||||
if self._connection:
|
||||
self._connection.close()
|
||||
LOGGER.debug(f"Disconnected from {self.host}")
|
||||
self._connection = None
|
||||
|
||||
def _blocking_connect(self, client):
|
||||
def _blocking_connect(self, client: paramiko.SSHClient):
|
||||
"""Perform the blocking SSH connection using Paramiko."""
|
||||
client.connect(
|
||||
self.host,
|
||||
hostname=self.host,
|
||||
username=self.username,
|
||||
password=self._password,
|
||||
port=self.port
|
||||
port=self.port,
|
||||
look_for_keys=False, # Set this to True if using private keys
|
||||
allow_agent=False
|
||||
)
|
||||
|
||||
async def execute_command(self, command: str) -> CommandOutput:
|
||||
"""Execute a command on the SSH server asynchronously."""
|
||||
try:
|
||||
stdin, stdout, stderr = self._connection.exec_command(command)
|
||||
exit_status = stdout.channel.recv_exit_status()
|
||||
if not self.is_connection_alive():
|
||||
LOGGER.debug(f"Connection to {self.host} is not alive. Reconnecting...")
|
||||
await self.connect()
|
||||
|
||||
try:
|
||||
# Offload command execution to avoid blocking
|
||||
loop = asyncio.get_running_loop()
|
||||
stdin, stdout, stderr = await loop.run_in_executor(None, self._connection.exec_command, command)
|
||||
|
||||
exit_status = stdout.channel.recv_exit_status()
|
||||
return CommandOutput(command, exit_status, stdout.read().decode(), stderr.read().decode())
|
||||
|
||||
except (paramiko.SSHException, EOFError) as exc:
|
||||
LOGGER.debug(f"Failed to execute command on {self.host}: {exc}")
|
||||
LOGGER.error(f"Failed to execute command on {self.host}: {exc}")
|
||||
return CommandOutput(command, -1, "", "")
|
||||
|
||||
def is_connection_alive(self) -> bool:
|
||||
"""Check if the connection is still alive asynchronously."""
|
||||
# use the code below if is_active() returns True
|
||||
"""Check if the SSH connection is still alive."""
|
||||
if self._connection is None:
|
||||
return False
|
||||
|
||||
@ -83,5 +100,5 @@ class SSHClient:
|
||||
self._connection.exec_command('ls', timeout=1)
|
||||
return True
|
||||
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
return False
|
||||
|
@ -1,74 +1,53 @@
|
||||
# Some snippets of code are from the official wake_on_lan integration (inspiration for this custom component)
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Any
|
||||
from typing import Any, Dict
|
||||
|
||||
import voluptuous as vol
|
||||
from homeassistant.components.switch import (SwitchEntity)
|
||||
from homeassistant.components.switch import SwitchEntity
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import (
|
||||
CONF_HOST,
|
||||
CONF_MAC,
|
||||
CONF_NAME,
|
||||
CONF_PASSWORD,
|
||||
CONF_PORT,
|
||||
CONF_USERNAME, )
|
||||
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse
|
||||
from homeassistant.helpers import (
|
||||
device_registry as dr,
|
||||
entity_platform,
|
||||
CONF_HOST, CONF_MAC, CONF_NAME, CONF_PASSWORD, CONF_PORT, CONF_USERNAME,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse
|
||||
from homeassistant.helpers import entity_platform, device_registry as dr
|
||||
from homeassistant.helpers.config_validation import make_entity_service_schema
|
||||
from homeassistant.helpers.device_registry import DeviceInfo
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
|
||||
from .computer import OSType, Computer
|
||||
from .computer.utils import format_debug_information, get_bluetooth_devices_as_str
|
||||
from .const import SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP, \
|
||||
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \
|
||||
SERVICE_CHANGE_MONITORS_CONFIG, SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO, DOMAIN
|
||||
from .const import (
|
||||
DOMAIN, SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP,
|
||||
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER,
|
||||
SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, SERVICE_CHANGE_MONITORS_CONFIG,
|
||||
SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO
|
||||
)
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant,
|
||||
config: ConfigEntry,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
async_add_entities: AddEntitiesCallback
|
||||
) -> None:
|
||||
# Retrieve the data from the config flow
|
||||
mac_address: str = config.data.get(CONF_MAC)
|
||||
# broadcast_address: str | None = config.data.get(CONF_BROADCAST_ADDRESS)
|
||||
# broadcast_port: int | None = config.data.get(CONF_BROADCAST_PORT)
|
||||
host: str = config.data.get(CONF_HOST)
|
||||
name: str = config.data.get(CONF_NAME)
|
||||
dualboot: bool = config.data.get("dualboot")
|
||||
username: str = config.data.get(CONF_USERNAME)
|
||||
password: str = config.data.get(CONF_PASSWORD)
|
||||
port: int | None = config.data.get(CONF_PORT)
|
||||
"""Set up the computer switch from a config entry."""
|
||||
mac_address = config.data[CONF_MAC]
|
||||
host = config.data[CONF_HOST]
|
||||
name = config.data[CONF_NAME]
|
||||
dualboot = config.data.get("dualboot", False)
|
||||
username = config.data[CONF_USERNAME]
|
||||
password = config.data[CONF_PASSWORD]
|
||||
port = config.data.get(CONF_PORT)
|
||||
|
||||
# Register the computer switch
|
||||
async_add_entities(
|
||||
[
|
||||
ComputerSwitch(
|
||||
hass,
|
||||
name,
|
||||
host,
|
||||
mac_address,
|
||||
dualboot,
|
||||
username,
|
||||
password,
|
||||
port,
|
||||
),
|
||||
],
|
||||
host is not None,
|
||||
[ComputerSwitch(hass, name, host, mac_address, dualboot, username, password, port)],
|
||||
True
|
||||
)
|
||||
|
||||
platform = entity_platform.async_get_current_platform()
|
||||
|
||||
# Synthax : (service_name: str, schema: dict, supports_response: SupportsResponse)
|
||||
# Service registrations
|
||||
services = [
|
||||
(SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE),
|
||||
(SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE),
|
||||
(SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, {}, SupportsResponse.NONE),
|
||||
(SERVICE_PUT_COMPUTER_TO_SLEEP, {}, SupportsResponse.NONE),
|
||||
@ -85,18 +64,18 @@ async def async_setup_entry(
|
||||
(SERVICE_DEBUG_INFO, {}, SupportsResponse.ONLY),
|
||||
]
|
||||
|
||||
# Register the services that depends on the switch
|
||||
for service in services:
|
||||
# Register services with their schemas
|
||||
for service_name, schema, supports_response in services:
|
||||
platform.async_register_entity_service(
|
||||
service[0],
|
||||
make_entity_service_schema(service[1]),
|
||||
service[0],
|
||||
supports_response=service[2]
|
||||
service_name,
|
||||
make_entity_service_schema(schema),
|
||||
service_name,
|
||||
supports_response=supports_response
|
||||
)
|
||||
|
||||
|
||||
class ComputerSwitch(SwitchEntity):
|
||||
"""Representation of a computer switch."""
|
||||
"""Representation of a computer switch entity."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@ -104,28 +83,24 @@ class ComputerSwitch(SwitchEntity):
|
||||
name: str,
|
||||
host: str | None,
|
||||
mac_address: str,
|
||||
dualboot: bool | False,
|
||||
dualboot: bool,
|
||||
username: str,
|
||||
password: str,
|
||||
port: int | None,
|
||||
) -> None:
|
||||
"""Initialize the WOL switch."""
|
||||
|
||||
self._hass = hass
|
||||
"""Initialize the computer switch entity."""
|
||||
self.hass = hass
|
||||
self._attr_name = name
|
||||
self._attr_unique_id = dr.format_mac(mac_address)
|
||||
self._state = False
|
||||
self._attr_should_poll = not self._attr_assumed_state
|
||||
self._attr_extra_state_attributes = {}
|
||||
|
||||
self.computer = Computer(host, mac_address, username, password, port, dualboot)
|
||||
|
||||
self._state = False
|
||||
self._attr_assumed_state = host is None
|
||||
self._attr_should_poll = bool(not self._attr_assumed_state)
|
||||
self._attr_unique_id = dr.format_mac(mac_address)
|
||||
self._attr_extra_state_attributes = {}
|
||||
|
||||
@property
|
||||
def device_info(self) -> DeviceInfo | None:
|
||||
"""Return the device information."""
|
||||
|
||||
def device_info(self) -> DeviceInfo:
|
||||
"""Return device info for the registry."""
|
||||
return DeviceInfo(
|
||||
identifiers={(DOMAIN, self.computer.mac)},
|
||||
name=self._attr_name,
|
||||
@ -141,16 +116,19 @@ class ComputerSwitch(SwitchEntity):
|
||||
|
||||
@property
|
||||
def is_on(self) -> bool:
|
||||
"""Return true if the computer is on."""
|
||||
return self._state
|
||||
|
||||
async def turn_on(self, **kwargs: Any) -> None:
|
||||
async def async_turn_on(self, **kwargs: Any) -> None:
|
||||
"""Turn the computer on using Wake-on-LAN."""
|
||||
await self.computer.start()
|
||||
|
||||
if self._attr_assumed_state:
|
||||
self._state = True
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def turn_off(self, **kwargs: Any) -> None:
|
||||
async def async_turn_off(self, **kwargs: Any) -> None:
|
||||
"""Turn the computer off via shutdown command."""
|
||||
await self.computer.shutdown()
|
||||
|
||||
if self._attr_assumed_state:
|
||||
@ -158,13 +136,16 @@ class ComputerSwitch(SwitchEntity):
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_update(self) -> None:
|
||||
"""Ping the computer to see if it is online and update the state."""
|
||||
self._state = await self.computer.is_on()
|
||||
"""Update the state by checking if the computer is on."""
|
||||
is_on = await self.computer.is_on()
|
||||
if self.is_on != is_on:
|
||||
self._state = is_on
|
||||
# self.async_write_ha_state()
|
||||
|
||||
await self.computer.update(self._state)
|
||||
# If the computer is on, update its attributes
|
||||
if is_on:
|
||||
await self.computer.update(is_on)
|
||||
|
||||
# Update the state attributes and the connection only if the computer is on
|
||||
if self._state:
|
||||
self._attr_extra_state_attributes = {
|
||||
"operating_system": self.computer.operating_system,
|
||||
"operating_system_version": self.computer.operating_system_version,
|
||||
@ -173,50 +154,50 @@ class ComputerSwitch(SwitchEntity):
|
||||
"connected_devices": get_bluetooth_devices_as_str(self.computer),
|
||||
}
|
||||
|
||||
# Services
|
||||
# Service methods for various functionalities
|
||||
async def restart_to_windows_from_linux(self) -> None:
|
||||
"""(Service Handler) Restart the computer to Windows from a running Linux by setting grub-reboot and restarting."""
|
||||
"""Restart the computer from Linux to Windows."""
|
||||
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
|
||||
|
||||
async def restart_to_linux_from_windows(self) -> None:
|
||||
"""(Service Handler) Restart the computer to Linux from a running Windows by setting grub-reboot and restarting."""
|
||||
await self.computer.restart()
|
||||
"""Restart the computer from Windows to Linux."""
|
||||
await self.computer.restart(OSType.WINDOWS, OSType.LINUX)
|
||||
|
||||
async def put_computer_to_sleep(self) -> None:
|
||||
"""(Service Handler) Put the computer to sleep."""
|
||||
"""Put the computer to sleep."""
|
||||
await self.computer.put_to_sleep()
|
||||
|
||||
async def start_computer_to_windows(self) -> None:
|
||||
"""(Service Handler) Start the computer to Windows"""
|
||||
"""Start the computer to Windows after booting into Linux first."""
|
||||
await self.computer.start()
|
||||
|
||||
async def wait_task():
|
||||
while not self.is_on:
|
||||
async def wait_and_reboot() -> None:
|
||||
"""Wait until the computer is on, then restart to Windows."""
|
||||
while not await self.computer.is_on():
|
||||
await asyncio.sleep(3)
|
||||
|
||||
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
|
||||
|
||||
"""Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart."""
|
||||
await self.computer.start()
|
||||
self._hass.loop.create_task(wait_task())
|
||||
self.hass.loop.create_task(wait_and_reboot())
|
||||
|
||||
async def restart_computer(self) -> None:
|
||||
"""(Service Handler) Restart the computer."""
|
||||
"""Restart the computer."""
|
||||
await self.computer.restart()
|
||||
|
||||
async def change_monitors_config(self, monitors_config: dict | None = None) -> None:
|
||||
"""(Service Handler) Change the monitors configuration using a YAML config file."""
|
||||
async def change_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
|
||||
"""Change the monitor configuration."""
|
||||
await self.computer.set_monitors_config(monitors_config)
|
||||
|
||||
async def steam_big_picture(self, action: str) -> None:
|
||||
"""(Service Handler) Control Steam Big Picture mode."""
|
||||
"""Control Steam Big Picture mode."""
|
||||
await self.computer.steam_big_picture(action)
|
||||
|
||||
async def change_audio_config(self, volume: int | None = None, mute: bool | None = None,
|
||||
input_device: str | None = None,
|
||||
output_device: str | None = None) -> None:
|
||||
"""(Service Handler) Change the audio configuration using a YAML config file."""
|
||||
async def change_audio_config(
|
||||
self, volume: int | None = None, mute: bool | None = None,
|
||||
input_device: str | None = None, output_device: str | None = None
|
||||
) -> None:
|
||||
"""Change the audio configuration."""
|
||||
await self.computer.set_audio_config(volume, mute, input_device, output_device)
|
||||
|
||||
async def debug_info(self) -> ServiceResponse:
|
||||
"""(Service Handler) Prints debug info."""
|
||||
"""Return debug information."""
|
||||
return await format_debug_information(self.computer)
|
||||
|
Loading…
Reference in New Issue
Block a user