convert all to async (somehow it works ?? :)), update reqs and manifest

This commit is contained in:
Mathieu Broillet 2024-08-26 14:29:09 +02:00
parent 0f72986d2a
commit 7e6736c8b3
Signed by: mathieu
GPG Key ID: A08E484FE95074C1
5 changed files with 128 additions and 135 deletions

View File

@ -1,10 +1,8 @@
import subprocess as sp
import fabric2
import wakeonlan
from fabric2 import Connection
import asyncio
import asyncssh
from custom_components.easy_computer_manager import const, _LOGGER
from wakeonlan import send_magic_packet
class OSType:
@ -31,53 +29,54 @@ class Computer:
self._audio_config = None
self._bluetooth_devices = None
self._connection = None
asyncio.create_task(self.update())
self.setup()
def _open_ssh_connection(self) -> Connection:
"""Open an SSH connection."""
conf = fabric2.Config()
conf.run.hide = True
conf.run.warn = True
conf.warn = True
conf.sudo.password = self._password
conf.password = self._password
client = Connection(
host=self.host, user=self._username, port=self._port, connect_timeout=3,
connect_kwargs={"password": self._password},
config=conf
async def _open_ssh_connection(self) -> asyncssh.SSHClientConnection:
"""Open an asynchronous SSH connection."""
try:
client = await asyncssh.connect(
self.host,
username=self._username,
password=self._password,
port=self._port,
known_hosts=None
)
self._connection = client
return client
except (OSError, asyncssh.Error) as exc:
_LOGGER.error(f"SSH connection failed: {exc}")
return None
def _close_ssh_connection(self, client: Connection) -> None:
async def _close_ssh_connection(self, client: asyncssh.SSHClientConnection) -> None:
"""Close the SSH connection."""
if client.is_connected:
client.close()
def setup(self):
"""Setup method that opens an SSH connection and keeps it open for subsequent setup commands."""
client = self._open_ssh_connection()
async def update(self) -> None:
"""Setup method that opens an SSH connection and runs setup commands asynchronously."""
client = await self._open_ssh_connection()
if not client:
return
# TODO: run commands here
self._operating_system = OSType.LINUX # if self.run_manually("uname").return_code == 0 else OSType.WINDOWS # TODO: improve this
self._operating_system_version = self.run_action("operating_system_version").output
self._windows_entry_grub = self.run_action("get_windows_entry_grub").output
self._operating_system = OSType.LINUX if (await self.run_manually("uname")).get(
"return_code") == 0 else OSType.WINDOWS # TODO: improve this
self._operating_system_version = (await self.run_action("operating_system_version")).get("output")
self._windows_entry_grub = (await self.run_action("get_windows_entry_grub")).get("output")
self._monitors_config = {}
self._audio_config = {'speakers': None, 'microphones': None}
self._bluetooth_devices = {}
self._close_ssh_connection(client)
await self._close_ssh_connection(client)
# Getters
def is_on(self, timeout: int = 1) -> bool:
async def is_on(self, timeout: int = 1) -> bool:
"""Check if the computer is on (ping)."""
ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self.host)]
status = sp.call(ping_cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
return status == 0
proc = await asyncio.create_subprocess_exec(
*ping_cmd,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
await proc.communicate()
return proc.returncode == 0
def get_operating_system(self) -> str:
"""Get the operating system of the computer."""
@ -97,11 +96,11 @@ class Computer:
def get_speakers(self) -> str:
"""Get the audio configuration of the computer."""
return self._audio_config.speakers
return self._audio_config.get("speakers")
def get_microphones(self) -> str:
"""Get the audio configuration of the computer."""
return self._audio_config.microphones
return self._audio_config.get("microphones")
def get_bluetooth_devices(self, as_str: bool = False) -> str:
"""Get the Bluetooth devices of the computer."""
@ -112,50 +111,48 @@ class Computer:
return self._dualboot
# Actions
def start(self) -> None:
async def start(self) -> None:
"""Start the computer."""
wakeonlan.send_magic_packet(self.mac)
send_magic_packet(self.mac)
def shutdown(self) -> None:
async def shutdown(self) -> None:
"""Shutdown the computer."""
self.run_action("shutdown")
await self.run_action("shutdown")
def restart(self, from_os: OSType = None, to_os: OSType = None) -> None:
async def restart(self, from_os: OSType = None, to_os: OSType = None) -> None:
"""Restart the computer."""
self.run_action("restart")
await self.run_action("restart")
def put_to_sleep(self) -> None:
async def put_to_sleep(self) -> None:
"""Put the computer to sleep."""
self.run_action("sleep")
await self.run_action("sleep")
def change_monitors_config(self, monitors_config: dict) -> None:
async def change_monitors_config(self, monitors_config: dict) -> None:
pass
def change_audio_config(self, volume: int | None = None, mute: bool | None = None, input_device: str | None = None,
async def change_audio_config(self, volume: int | None = None, mute: bool | None = None,
input_device: str | None = None,
output_device: str | None = None) -> None:
pass
def install_nircmd(self) -> None:
async def install_nircmd(self) -> None:
pass
def start_steam_big_picture(self) -> None:
async def start_steam_big_picture(self) -> None:
pass
def stop_steam_big_picture(self) -> None:
async def stop_steam_big_picture(self) -> None:
pass
def exit_steam_big_picture(self) -> None:
async def exit_steam_big_picture(self) -> None:
pass
def run_action(self, action: str, params=None) -> dict:
async def run_action(self, action: str, params=None) -> dict:
"""Run a command via SSH. Opens a new connection for each command."""
if params is None:
params = {}
if action not in const.COMMANDS:
_LOGGER.error(f"Invalid action: {action}")
return {}
if action in const.COMMANDS:
command_template = const.COMMANDS[action]
# Check if the command has the required parameters
@ -164,6 +161,7 @@ class Computer:
raise ValueError("Invalid parameters")
# Check if the command is available for the operating system
if self._operating_system in command_template:
match self._operating_system:
case OSType.WINDOWS:
commands = command_template[OSType.WINDOWS]
@ -177,7 +175,7 @@ class Computer:
for param, value in params.items():
command = command.replace(f"%{param}%", value)
result = self.run_manually(command)
result = await self.run_manually(command)
if result['return_code'] == 0:
_LOGGER.debug(f"Command successful: {command}")
@ -185,15 +183,18 @@ class Computer:
else:
_LOGGER.debug(f"Command failed: {command}")
return {}
return {"output": "", "error": "", "return_code": 1}
def run_manually(self, command: str) -> dict:
async def run_manually(self, command: str) -> dict:
"""Run a command manually (not from predefined commands)."""
# Open SSH connection, execute command, and close connection
client = self._open_ssh_connection()
result = client.run(command)
self._close_ssh_connection(client)
client = await self._open_ssh_connection()
if not client:
return {"output": "", "error": "SSH connection failed", "return_code": 1}
result = await client.run(command)
await self._close_ssh_connection(client)
return {"output": result.stdout, "error": result.stderr,
"return_code": result.return_code}
"return_code": result.exit_status}

View File

@ -1,7 +1,7 @@
from custom_components.easy_computer_manager.computer import Computer
def get_debug_info(computer: Computer):
async def get_debug_info(computer: Computer):
"""Return debug information about the host system."""
data = {
@ -15,7 +15,7 @@ def get_debug_info(computer: Computer):
'username': computer._username,
'port': computer._port,
'dualboot': computer._dualboot,
'is_on': computer.is_on()
'is_on': await computer.is_on()
},
'grub':{
'windows_entry': computer.get_windows_entry_grub()

View File

@ -2,8 +2,7 @@
"domain": "easy_computer_manager",
"name": "Easy Computer Manager",
"codeowners": [
"@M4TH1EU",
"@ntilley905"
"@M4TH1EU"
],
"config_flow": true,
"dependencies": [],
@ -11,10 +10,8 @@
"iot_class": "local_polling",
"issue_tracker": "https://github.com/M4TH1EU/HA-EasyComputerManager/issues",
"requirements": [
"construct==2.10.70",
"wakeonlan==3.1.0",
"fabric2==3.2.2",
"paramiko==3.4.0"
"asyncssh==2.16.0"
],
"version": "1.3.0"
"version": "2.0.0"
}

View File

@ -17,7 +17,6 @@ from homeassistant.const import (
CONF_PORT,
CONF_USERNAME, )
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import (
device_registry as dr,
entity_platform,
@ -26,7 +25,7 @@ from homeassistant.helpers.config_validation import make_entity_service_schema
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import utils, computer_utils
from . import computer_utils
from .computer import Computer, OSType
from .const import SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP, \
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \
@ -150,74 +149,72 @@ class ComputerSwitch(SwitchEntity):
def is_on(self) -> bool:
return self._state
def turn_on(self, **kwargs: Any) -> None:
self.computer.start()
async def turn_on(self, **kwargs: Any) -> None:
await self.computer.start()
if self._attr_assumed_state:
self._state = True
self.async_write_ha_state()
def turn_off(self, **kwargs: Any) -> None:
self.computer.shutdown()
async def turn_off(self, **kwargs: Any) -> None:
await self.computer.shutdown()
if self._attr_assumed_state:
self._state = False
self.async_write_ha_state()
# Services
def restart_to_windows_from_linux(self) -> None:
async def restart_to_windows_from_linux(self) -> None:
"""Restart the computer to Windows from a running Linux by setting grub-reboot and restarting."""
self.computer.restart(OSType.LINUX, OSType.WINDOWS)
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
def restart_to_linux_from_windows(self) -> None:
async def restart_to_linux_from_windows(self) -> None:
"""Restart the computer to Linux from a running Windows by setting grub-reboot and restarting."""
self.computer.restart()
await self.computer.restart()
def put_computer_to_sleep(self) -> None:
self.computer.setup()
async def put_computer_to_sleep(self) -> None:
await self.computer.put_to_sleep()
def start_computer_to_windows(self) -> None:
async def start_computer_to_windows(self) -> None:
async def wait_task():
while not self.is_on:
pass
# await asyncio.sleep(3)
await asyncio.sleep(3)
self.computer.restart(OSType.LINUX, OSType.WINDOWS)
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
"""Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart."""
self.computer.start()
await self.computer.start()
self._hass.loop.create_task(wait_task())
def restart_computer(self) -> None:
self.computer.restart()
async def restart_computer(self) -> None:
await self.computer.restart()
def change_monitors_config(self, monitors_config: dict | None = None) -> None:
async def change_monitors_config(self, monitors_config: dict | None = None) -> None:
"""Change the monitors configuration using a YAML config file."""
self.computer.change_monitors_config(monitors_config)
await self.computer.change_monitors_config(monitors_config)
def steam_big_picture(self, action: str) -> None:
async def steam_big_picture(self, action: str) -> None:
match action:
case "start":
self.computer.start_steam_big_picture()
await self.computer.start_steam_big_picture()
case "stop":
self.computer.stop_steam_big_picture()
await self.computer.stop_steam_big_picture()
case "exit":
self.computer.exit_steam_big_picture()
await self.computer.exit_steam_big_picture()
def change_audio_config(self, volume: int | None = None, mute: bool | None = None, input_device: str | None = None,
async def change_audio_config(self, volume: int | None = None, mute: bool | None = None,
input_device: str | None = None,
output_device: str | None = None) -> None:
"""Change the audio configuration using a YAML config file."""
self.computer.change_audio_config(volume, mute, input_device, output_device)
await self.computer.change_audio_config(volume, mute, input_device, output_device)
def debug_info(self) -> ServiceResponse:
async def debug_info(self) -> ServiceResponse:
"""Prints debug info."""
return computer_utils.get_debug_info(self.computer)
return await computer_utils.get_debug_info(self.computer)
def update(self) -> None:
async def async_update(self) -> None:
"""Ping the computer to see if it is online and update the state."""
self._state = self.computer.is_on()
self._state = await self.computer.is_on()
# Update the state attributes and the connection only if the computer is on
if self._state:

View File

@ -1,5 +1,3 @@
fabric2~=3.2.2
paramiko~=3.4.0
voluptuous~=0.14.2
wakeonlan~=3.1.0
homeassistant~=2024.3.1
asyncssh~=2.16.0