start working on massive refactor (v2)

This commit is contained in:
Mathieu Broillet 2024-08-25 23:20:39 +02:00
parent e5b6af8e41
commit ab2c8e0376
Signed by: mathieu
GPG Key ID: A08E484FE95074C1
7 changed files with 307 additions and 221 deletions

View File

@ -59,11 +59,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
schema=WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA,
)
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(
entry, "switch"
)
)
await hass.config_entries.async_forward_entry_setups(entry, ["switch"])
return True

View File

@ -0,0 +1,182 @@
import subprocess as sp
import paramiko
import wakeonlan
from custom_components.easy_computer_manager import const, _LOGGER
class OSType:
WINDOWS = "Windows"
LINUX = "Linux"
MACOS = "MacOS"
class Computer:
def __init__(self, host: str, mac: str, username: str, password: str, port: int = 22,
dualboot: bool = False) -> None:
"""Init computer."""
self.host = host
self.mac = mac
self._username = username
self._password = password
self._port = port
self._dualboot = dualboot
self._operating_system = None
self._operating_system_version = None
self._windows_entry_grub = None
self._monitors_config = None
self._audio_config = None
self._bluetooth_devices = None
self.setup()
async def _open_ssh_connection(self):
"""Open an SSH connection."""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(self.host, port=self._port, username=self._username, password=self._password)
return client
def _close_ssh_connection(self, client):
"""Close the SSH connection."""
if client:
client.close()
def setup(self):
"""Setup method that opens an SSH connection and keeps it open for subsequent setup commands."""
client = self._open_ssh_connection()
# TODO: run commands here
self._operating_system = OSType.LINUX if self.run_manually(
"uname").return_code == 0 else OSType.WINDOWS # TODO: improve this
self._operating_system_version = self.run_action("operating_system_version").output
self._windows_entry_grub = self.run_action("get_windows_entry_grub").output
self._monitors_config = {}
self._audio_config = {'speakers': None, 'microphones': None}
self._bluetooth_devices = {}
self._close_ssh_connection(client)
# Getters
def is_on(self, timeout: int = 1) -> bool:
"""Check if the computer is on (ping)."""
ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self.host)]
status = sp.call(ping_cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
return status == 0
def get_operating_system(self) -> str:
"""Get the operating system of the computer."""
return self._operating_system
def get_operating_system_version(self) -> str:
"""Get the operating system version of the computer."""
return self._operating_system_version
def get_windows_entry_grub(self) -> str:
"""Get the Windows entry in the GRUB configuration file."""
return self._windows_entry_grub
def get_monitors_config(self) -> str:
"""Get the monitors configuration of the computer."""
return self._monitors_config
def get_speakers(self) -> str:
"""Get the audio configuration of the computer."""
return self._audio_config.speakers
def get_microphones(self) -> str:
"""Get the audio configuration of the computer."""
return self._audio_config.microphones
def get_bluetooth_devices(self, as_str: bool = False) -> str:
"""Get the Bluetooth devices of the computer."""
return self._bluetooth_devices
def is_dualboot(self) -> bool:
"""Check if the computer is dualboot."""
return self._dualboot
# Actions
def start(self) -> None:
"""Start the computer."""
wakeonlan.send_magic_packet(self.mac)
def shutdown(self) -> None:
"""Shutdown the computer."""
self.run_action("shutdown")
def restart(self, from_os: OSType = None, to_os: OSType = None) -> None:
"""Restart the computer."""
self.run_action("restart")
def put_to_sleep(self) -> None:
"""Put the computer to sleep."""
self.run_action("sleep")
def change_monitors_config(self, monitors_config: dict) -> None:
pass
def change_audio_config(self, volume: int | None = None, mute: bool | None = None, input_device: str | None = None,
output_device: str | None = None) -> None:
pass
def install_nircmd(self) -> None:
pass
def start_steam_big_picture(self) -> None:
pass
def stop_steam_big_picture(self) -> None:
pass
def exit_steam_big_picture(self) -> None:
pass
def run_action(self, command: str, params=None) -> {}:
"""Run a command via SSH. Opens a new connection for each command."""
if params is None:
params = {}
if command not in const.COMMANDS:
_LOGGER.error(f"Invalid command: {command}")
return
command_template = const.COMMANDS[command]
# Check if the command has the required parameters
if "params" in command_template:
if sorted(command_template.params) != sorted(params.keys()):
raise ValueError("Invalid parameters")
# Check if the command is available for the operating system
match self._operating_system:
case OSType.WINDOWS:
command = command_template[OSType.WINDOWS]
case OSType.LINUX:
command = command_template[OSType.LINUX]
case _:
raise ValueError("Invalid operating system")
# Replace the parameters in the command
for param in params:
command = command.replace(f"%{param}%", params[param])
# Open SSH connection, execute command, and close connection
client = self._open_ssh_connection()
stdin, stdout, stderr = client.exec_command(command)
print(stdout.read().decode()) # Print the command output for debugging
self._close_ssh_connection(client)
return {"output": stdout.read().decode(), "error": stderr.read().decode(),
"return_code": stdout.channel.recv_exit_status()}
def run_manually(self, command: str) -> {}:
"""Run a command manually (not from predefined commands)."""
client = self._open_ssh_connection()
stdin, stdout, stderr = client.exec_command(command)
print(stdout.read().decode()) # Print the command output for debugging
self._close_ssh_connection(client)
return {"output": stdout.read().decode(), "error": stderr.read().decode(),
"return_code": stdout.channel.recv_exit_status()}

View File

@ -0,0 +1,31 @@
from custom_components.easy_computer_manager.computer import Computer
def get_debug_info(computer: Computer):
"""Return debug information about the host system."""
data = {
'os': {
'name': computer.get_operating_system(),
'version': computer.get_operating_system_version(),
},
'connection':{
'host': computer.host,
'mac': computer.mac,
'username': computer._username,
'port': computer._port,
'dualboot': computer._dualboot,
'is_on': computer.is_on()
},
'grub':{
'windows_entry': computer.get_windows_entry_grub()
},
'audio':{
'speakers': computer.get_speakers(),
'microphones': computer.get_microphones()
},
'monitors': computer.get_monitors_config(),
'bluetooth_devices': computer.get_bluetooth_devices()
}
return data

View File

@ -9,7 +9,7 @@ from homeassistant import config_entries, exceptions
from homeassistant.core import HomeAssistant
from paramiko.ssh_exception import AuthenticationException
from . import utils
from .computer import Computer
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
@ -40,6 +40,8 @@ class Hub:
self._name = host
self._id = host.lower()
self.computer = Computer(host, "", username, password, port)
@property
def hub_id(self) -> str:
"""ID for dummy."""
@ -48,8 +50,10 @@ class Hub:
async def test_connection(self) -> bool:
"""Test connectivity to the computer is OK."""
try:
return utils.test_connection(
utils.create_ssh_connection(self._host, self._username, self._password, self._port))
# TODO: check if reachable
_LOGGER.info("Testing connection to %s", self._host)
return True
except AuthenticationException:
return False
@ -63,6 +67,7 @@ async def validate_input(hass: HomeAssistant, data: dict) -> dict[str, Any]:
hub = Hub(hass, data["host"], data["username"], data["password"], data["port"])
_LOGGER.info("Validating configuration")
if not await hub.test_connection():
raise CannotConnect

View File

@ -11,3 +11,34 @@ SERVICE_CHANGE_MONITORS_CONFIG = "change_monitors_config"
SERVICE_STEAM_BIG_PICTURE = "steam_big_picture"
SERVICE_CHANGE_AUDIO_CONFIG = "change_audio_config"
SERVICE_DEBUG_INFO = "debug_info"
COMMANDS = {
"operating_system": {
"linux": ["uname"]
},
"operating_system_version": {
"windows": ['for /f "tokens=1 delims=|" %i in (\'wmic os get Name ^| findstr /B /C:"Microsoft"\') do @echo %i'],
"linux": ["awk -F'=' '/^NAME=|^VERSION=/{gsub(/\"/, \"\", $2); printf $2\" \"}\' /etc/os-release && echo", "lsb_release -a | awk '/Description/ {print $2, $3, $4}'"]
},
"shutdown": {
"windows": ["shutdown /s /t 0", "wmic os where Primary=TRUE call Shutdown"],
"linux": ["sudo shutdown -h now", "sudo init 0", "sudo systemctl poweroff"]
},
"restart": {
"windows": ["shutdown /r /t 0", "wmic os where Primary=TRUE call Reboot"],
"linux": ["sudo shutdown -r now", "sudo init 6", "sudo systemctl reboot"]
},
"sleep": {
"windows": ["shutdown /h /t 0", "rundll32.exe powrprof.dll,SetSuspendState Sleep"],
"linux": ["sudo systemctl suspend", "sudo pm-suspend"]
},
"get_windows_entry_grub": {
"linux": ["sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub/grub.cfg",
"sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub2/grub.cfg"]
},
"set_grub_entry": {
"params": ["grub-entry"],
"linux": ["sudo grub-reboot %grub-entry%", "sudo grub2-reboot %grub-entry%"]
}
}

View File

@ -4,11 +4,9 @@ from __future__ import annotations
import asyncio
import logging
import subprocess as sp
from typing import Any
import voluptuous as vol
import wakeonlan
from homeassistant.components.switch import (SwitchEntity)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
@ -27,9 +25,9 @@ from homeassistant.helpers import (
from homeassistant.helpers.config_validation import make_entity_service_schema
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from paramiko.ssh_exception import AuthenticationException
from . import utils
from . import utils, computer_utils
from .computer import Computer, OSType
from .const import SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP, \
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \
SERVICE_CHANGE_MONITORS_CONFIG, SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO, DOMAIN
@ -110,8 +108,6 @@ class ComputerSwitch(SwitchEntity):
name: str,
host: str | None,
mac_address: str,
# broadcast_address: str | None,
# broadcast_port: int | None,
dualboot: bool | False,
username: str,
password: str,
@ -121,205 +117,115 @@ class ComputerSwitch(SwitchEntity):
self._hass = hass
self._attr_name = name
self._host = host
self._mac_address = mac_address
# self._broadcast_address = broadcast_address
# self._broadcast_port = broadcast_port
self._dualboot = dualboot
self._username = username
self._password = password
self._port = port
self.computer = Computer(host, mac_address, username, password, port, dualboot)
self._state = False
self._attr_assumed_state = host is None
self._attr_should_poll = bool(not self._attr_assumed_state)
self._attr_unique_id = dr.format_mac(mac_address)
self._attr_extra_state_attributes = {}
self._connection = utils.create_ssh_connection(self._host, self._username, self._password)
@property
def device_info(self) -> DeviceInfo | None:
"""Return the device information."""
if self._host is None:
return None
# TODO: remove this?
# if self._host is None:
# return None
return DeviceInfo(
identifiers={(DOMAIN, self._mac_address)},
identifiers={(DOMAIN, self.computer.mac)},
name=self._attr_name,
manufacturer="Generic",
model="Computer",
sw_version=utils.get_operating_system_version(
self._connection) if self._state else "Unknown",
connections={(dr.CONNECTION_NETWORK_MAC, self._mac_address)},
sw_version=self.computer.get_operating_system_version(),
connections={(dr.CONNECTION_NETWORK_MAC, self.computer.mac)},
)
@property
def icon(self) -> str:
"""Return the icon to use in the frontend, if any."""
return "mdi:monitor" if self._state else "mdi:monitor-off"
@property
def is_on(self) -> bool:
"""Return true if the computer switch is on."""
return self._state
def turn_on(self, **kwargs: Any) -> None:
"""Turn the computer on using wake on lan."""
service_kwargs: dict[str, Any] = {}
# if self._broadcast_address is not None:
# service_kwargs["ip_address"] = self._broadcast_address
# if self._broadcast_port is not None:
# service_kwargs["port"] = self._broadcast_port
_LOGGER.debug(
"Send magic packet to mac %s (broadcast: %s, port: %s)",
self._mac_address,
# self._broadcast_address,
# self._broadcast_port,
)
wakeonlan.send_magic_packet(self._mac_address, **service_kwargs)
self.computer.start()
if self._attr_assumed_state:
self._state = True
self.async_write_ha_state()
def turn_off(self, **kwargs: Any) -> None:
"""Turn the computer off using appropriate shutdown command based on running OS and/or distro."""
utils.shutdown_system(self._connection)
self.computer.shutdown()
if self._attr_assumed_state:
self._state = False
self.async_write_ha_state()
# Services
def restart_to_windows_from_linux(self) -> None:
"""Restart the computer to Windows from a running Linux by setting grub-reboot and restarting."""
if self._dualboot:
utils.restart_to_windows_from_linux(self._connection)
else:
_LOGGER.error(
"The computer with the IP address %s is not running a dualboot system or hasn't been configured "
"correctly in the UI.",
self._host)
self.computer.restart(OSType.LINUX, OSType.WINDOWS)
def restart_to_linux_from_windows(self) -> None:
"""Restart the computer to Linux from a running Windows by setting grub-reboot and restarting."""
if self._dualboot:
# TODO: check for default grub entry and adapt accordingly
utils.restart_system(self._connection)
else:
_LOGGER.error(
"The computer with the IP address %s is not running a dualboot system or hasn't been configured "
"correctly in the UI.",
self._host)
self.computer.restart()
def put_computer_to_sleep(self) -> None:
"""Put the computer to sleep using appropriate sleep command based on running OS and/or distro."""
utils.sleep_system(self._connection)
self.computer.setup()
def start_computer_to_windows(self) -> None:
"""Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart."""
self.turn_on()
if self._dualboot:
# Wait for the computer to boot using a dedicated thread to avoid blocking the main thread
self._hass.loop.create_task(self.service_restart_to_windows_from_linux())
else:
_LOGGER.error(
"The computer with the IP address %s is not running a dualboot system or hasn't been configured "
"correctly in the UI.",
self._host)
async def service_restart_to_windows_from_linux(self) -> None:
"""Method to be run in a separate thread to wait for the computer to boot and then reboot to Windows."""
async def wait_task():
while not self.is_on:
await asyncio.sleep(3)
pass
# await asyncio.sleep(3)
await utils.restart_to_windows_from_linux(self._connection)
self.computer.restart(OSType.LINUX, OSType.WINDOWS)
"""Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart."""
self.computer.start()
self._hass.loop.create_task(wait_task())
def restart_computer(self) -> None:
"""Restart the computer using appropriate restart command based on running OS and/or distro."""
# TODO: check for default grub entry and adapt accordingly
if self._dualboot and not utils.is_unix_system(connection=self._connection):
utils.restart_system(self._connection)
# Wait for the computer to boot using a dedicated thread to avoid blocking the main thread
self.restart_to_windows_from_linux()
else:
utils.restart_system(self._connection)
self.computer.restart()
def change_monitors_config(self, monitors_config: dict | None = None) -> None:
"""Change the monitors configuration using a YAML config file."""
if monitors_config is not None and len(monitors_config) > 0:
utils.change_monitors_config(self._connection, monitors_config)
else:
raise HomeAssistantError("The 'monitors_config' parameter must be a non-empty dictionary.")
self.computer.change_monitors_config(monitors_config)
def steam_big_picture(self, action: str) -> None:
"""Controls Steam Big Picture mode."""
match action:
case "start":
self.computer.start_steam_big_picture()
case "stop":
self.computer.stop_steam_big_picture()
case "exit":
self.computer.exit_steam_big_picture()
if action is not None:
utils.steam_big_picture(self._connection, action)
else:
raise HomeAssistantError("The 'action' parameter must be specified.")
def change_audio_config(self, volume: int | None = None, mute: bool | None = None, input_device: str | None = None,
output_device: str | None = None) -> None:
"""Change the audio configuration using a YAML config file."""
utils.change_audio_config(self._connection, volume, mute, input_device, output_device)
def update(self) -> None:
"""Ping the computer to see if it is online and update the state."""
timeout = 1
ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self._host)]
status = sp.call(ping_cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
self._state = not bool(status)
# Update the state attributes and the connection only if the computer is on
if self._state:
if self._connection is None or not utils.test_connection(self._connection):
self.renew_ssh_connection()
if not self._state:
return
self._attr_extra_state_attributes = {
"operating_system": utils.get_operating_system(self._connection),
"operating_system_version": utils.get_operating_system_version(self._connection),
"mac_address": self._mac_address,
"ip_address": self._host,
"connected_devices": utils.get_bluetooth_devices(self._connection, only_connected=True, return_as_string=True),
}
def renew_ssh_connection(self) -> None:
"""Renew the SSH connection."""
_LOGGER.info("Renewing SSH connection to %s using username %s", self._host, self._username)
if self._connection is not None:
self._connection.close()
try:
self._connection = utils.create_ssh_connection(self._host, self._username, self._password)
self._connection.open()
except AuthenticationException as error:
_LOGGER.error("Could not authenticate to %s using username %s: %s", self._host, self._username, error)
self._state = False
except Exception as error:
_LOGGER.error("Could not connect to %s using username %s: %s", self._host, self._username, error)
# Check if the error is due to timeout
if "timed out" in str(error):
_LOGGER.warning(
"Computer at %s does not respond to the SSH request. Possible causes: might be offline, "
"the firewall is blocking the SSH port, or the SSH server is offline and/or misconfigured.",
self._host
)
self._state = False
self.computer.change_audio_config(volume, mute, input_device, output_device)
def debug_info(self) -> ServiceResponse:
"""Prints debug info."""
return utils.get_debug_info(self._connection)
return computer_utils.get_debug_info(self.computer)
def update(self) -> None:
"""Ping the computer to see if it is online and update the state."""
self._state = self.computer.is_on()
# Update the state attributes and the connection only if the computer is on
if self._state:
self._attr_extra_state_attributes = {
"operating_system": self.computer.get_operating_system(),
"operating_system_version": self.computer.get_operating_system_version(),
"mac_address": self.computer.mac,
"ip_address": self.computer.host,
"connected_devices": self.computer.get_bluetooth_devices(as_str=True),
}

View File

@ -11,34 +11,6 @@ _LOGGER = logging.getLogger(__name__)
# _LOGGER.setLevel(logging.DEBUG)
def create_ssh_connection(host: str, username: str, password: str, port=22):
"""Create an SSH connection to a host using a username and password specified in the config flow."""
conf = fabric2.Config()
conf.run.hide = True
conf.run.warn = True
conf.warn = True
conf.sudo.password = password
conf.password = password
connection = Connection(
host=host, user=username, port=port, connect_timeout=3, connect_kwargs={"password": password},
config=conf
)
_LOGGER.info("Successfully created SSH connection to %s using username %s", host, username)
return connection
def test_connection(connection: Connection):
"""Test the connection to the host by running a simple command."""
try:
connection.run('ls')
return True
except Exception:
return False
def is_unix_system(connection: Connection):
"""Return a boolean based on get_operating_system result."""
return get_operating_system(connection) == "Linux/Unix"
@ -237,7 +209,7 @@ def change_monitors_config(connection: Connection, monitors_config: dict):
command = ' '.join(command_parts)
_LOGGER.debug("Running command: %s", command)
result = connection.run(command)
result = connection.run_action(command)
if result.return_code == 0:
_LOGGER.info("Successfully changed monitors config on system running on %s.", connection.host)
@ -459,43 +431,6 @@ def change_audio_config(connection: Connection, volume: int, mute: bool, input_d
raise HomeAssistantError("Not implemented yet for Windows OS.")
def get_debug_info(connection: Connection):
"""Return debug information about the host system."""
data = {}
data_os = {
'name': get_operating_system(connection),
'version': get_operating_system_version(connection),
'is_unix': is_unix_system(connection)
}
data_ssh = {
'is_connected': connection.is_connected,
'username': connection.user,
'host': connection.host,
'port': connection.port
}
data_grub = {
'windows_entry': get_windows_entry_in_grub(connection)
}
data_audio = {
'speakers': get_audio_config(connection).get('sinks'),
'microphones': get_audio_config(connection).get('sources')
}
data['os'] = data_os
data['ssh'] = data_ssh
data['grub'] = data_grub
data['audio'] = data_audio
data['monitors'] = get_monitors_config(connection)
data['bluetooth_devices'] = get_bluetooth_devices(connection)
return data
def get_bluetooth_devices(connection: Connection, only_connected: bool = False, return_as_string: bool = False) -> []:
commands = {
"unix": "bluetoothctl info",