reformat, implement audio, monitors, bluetooth parse

This commit is contained in:
Mathieu Broillet 2024-08-26 19:16:44 +02:00
parent 3fc5fb948f
commit 3a3775f457
Signed by: mathieu
GPG Key ID: A08E484FE95074C1
3 changed files with 249 additions and 81 deletions

View File

@ -1,13 +1,17 @@
import asyncio import asyncio
from enum import Enum
from typing import Optional, Dict, Any
import asyncssh import asyncssh
from asyncssh import SSHClientConnection from asyncssh import SSHClientConnection
from wakeonlan import send_magic_packet from wakeonlan import send_magic_packet
from custom_components.easy_computer_manager import const, _LOGGER from custom_components.easy_computer_manager import const, LOGGER
from custom_components.easy_computer_manager.computer_utils import parse_gnome_monitors_config, \
parse_pactl_audio_config, parse_bluetoothctl
class OSType: class OSType(str, Enum):
WINDOWS = "windows" WINDOWS = "windows"
LINUX = "linux" LINUX = "linux"
MACOS = "macos" MACOS = "macos"
@ -16,7 +20,7 @@ class OSType:
class Computer: class Computer:
def __init__(self, host: str, mac: str, username: str, password: str, port: int = 22, def __init__(self, host: str, mac: str, username: str, password: str, port: int = 22,
dualboot: bool = False) -> None: dualboot: bool = False) -> None:
"""Init computer.""" """Initialize the Computer object."""
self.host = host self.host = host
self.mac = mac self.mac = mac
self._username = username self._username = username
@ -24,18 +28,18 @@ class Computer:
self._port = port self._port = port
self._dualboot = dualboot self._dualboot = dualboot
self._operating_system = None self._operating_system: Optional[OSType] = None
self._operating_system_version = None self._operating_system_version: Optional[str] = None
self._windows_entry_grub = None self._windows_entry_grub: Optional[str] = None
self._monitors_config = None self._monitors_config: Optional[Dict[str, Any]] = None
self._audio_config = None self._audio_config: Dict[str, Optional[Dict]] = {}
self._bluetooth_devices = None self._bluetooth_devices: Dict[str, Any] = {}
self._connection: SSHClientConnection | None = None self._connection: Optional[SSHClientConnection] = None
asyncio.create_task(self.update()) asyncio.create_task(self.update())
async def _open_ssh_connection(self) -> asyncssh.SSHClientConnection: async def _connect(self) -> None:
"""Open an asynchronous SSH connection.""" """Open an asynchronous SSH connection."""
try: try:
client = await asyncssh.connect( client = await asyncssh.connect(
@ -45,38 +49,71 @@ class Computer:
port=self._port, port=self._port,
known_hosts=None known_hosts=None
) )
# TODO: implement key auth client_keys=['my_ssh_key'] (mixed field detect if key or pass)
asyncssh.set_log_level("ERROR") asyncssh.set_log_level("ERROR")
return client self._connection = client
except (OSError, asyncssh.Error) as exc: except (OSError, asyncssh.Error) as exc:
_LOGGER.error(f"SSH connection failed: {exc}") raise ValueError(f"Failed to connect to {self.host}: {exc}")
return None
async def _close_ssh_connection(self) -> None: async def _renew_connection(self) -> None:
"""Close the SSH connection.""" """Renew the SSH connection if it is closed."""
if self._connection is not None: if self._connection is None or self._connection.is_closed:
self._connection.close() self._connection = await self._connect()
await self._connection.wait_closed()
async def update(self) -> None: async def update(self) -> None:
"""Setup method that opens an SSH connection and runs setup commands asynchronously.""" """Update computer details."""
# Open SSH connection or renew it if it is closed await self._renew_connection()
if self._connection is None or self._connection.is_closed:
await self._close_ssh_connection()
self._connection = await self._open_ssh_connection()
self._operating_system = OSType.LINUX if (await self.run_manually("uname")).get( async def update_operating_system():
"return_code") == 0 else OSType.WINDOWS # TODO: improve this self._operating_system = await self._detect_operating_system()
self._operating_system_version = (await self.run_action("operating_system_version")).get("output")
self._windows_entry_grub = (await self.run_action("get_windows_entry_grub")).get("output") async def update_operating_system_version():
self._monitors_config = (await self.run_action("get_monitors_config")).get("output") self._operating_system_version = (await self.run_action("operating_system_version")).get("output")
self._audio_config = {'speakers': None, 'microphones': None}
self._bluetooth_devices = {} async def update_windows_entry_grub():
self._windows_entry_grub = (await self.run_action("get_windows_entry_grub")).get("output")
async def update_monitors_config():
monitors_config = (await self.run_action("get_monitors_config")).get("output")
if self._operating_system == OSType.LINUX:
# TODO: add compatibility for KDE/others
self._monitors_config = parse_gnome_monitors_config(monitors_config)
elif self._operating_system == OSType.WINDOWS:
# TODO: implement for Windows
pass
async def update_audio_config():
speakers_config = (await self.run_action("get_speakers")).get("output")
microphones_config = (await self.run_action("get_microphones")).get("output")
if self._operating_system == OSType.LINUX:
self._audio_config = parse_pactl_audio_config(speakers_config, microphones_config)
elif self._operating_system == OSType.WINDOWS:
# TODO: implement for Windows
pass
async def update_bluetooth_devices():
bluetooth_config = await self.run_action("get_bluetooth_devices")
if self._operating_system == OSType.LINUX:
self._bluetooth_devices = parse_bluetoothctl(bluetooth_config)
elif self._operating_system == OSType.WINDOWS:
# TODO: implement for Windows
pass
await update_operating_system()
await update_operating_system_version()
await update_windows_entry_grub()
await update_monitors_config()
await update_audio_config()
await update_bluetooth_devices()
async def _detect_operating_system(self) -> OSType:
"""Detect the operating system of the computer."""
uname_result = await self.run_manually("uname")
return OSType.LINUX if uname_result.get("return_code") == 0 else OSType.WINDOWS
# Getters
async def is_on(self, timeout: int = 1) -> bool: async def is_on(self, timeout: int = 1) -> bool:
"""Check if the computer is on (ping).""" """Check if the computer is on by pinging it."""
ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self.host)] ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self.host)]
proc = await asyncio.create_subprocess_exec( proc = await asyncio.create_subprocess_exec(
*ping_cmd, *ping_cmd,
@ -86,7 +123,7 @@ class Computer:
await proc.communicate() await proc.communicate()
return proc.returncode == 0 return proc.returncode == 0
def get_operating_system(self) -> str: def get_operating_system(self) -> OSType:
"""Get the operating system of the computer.""" """Get the operating system of the computer."""
return self._operating_system return self._operating_system
@ -98,19 +135,19 @@ class Computer:
"""Get the Windows entry in the GRUB configuration file.""" """Get the Windows entry in the GRUB configuration file."""
return self._windows_entry_grub return self._windows_entry_grub
def get_monitors_config(self) -> str: def get_monitors_config(self) -> Dict[str, Any]:
"""Get the monitors configuration of the computer.""" """Get the monitors configuration of the computer."""
return self._monitors_config return self._monitors_config
def get_speakers(self) -> str: def get_speakers(self) -> dict:
"""Get the audio configuration of the computer.""" """Get the speakers configuration of the computer."""
return self._audio_config.get("speakers") return self._audio_config.get("speakers")
def get_microphones(self) -> str: def get_microphones(self) -> dict:
"""Get the audio configuration of the computer.""" """Get the microphones configuration of the computer."""
return self._audio_config.get("microphones") return self._audio_config.get("microphones")
def get_bluetooth_devices(self, as_str: bool = False) -> str: def get_bluetooth_devices(self, return_as_string: bool = False) -> Dict[str, Any]:
"""Get the Bluetooth devices of the computer.""" """Get the Bluetooth devices of the computer."""
return self._bluetooth_devices return self._bluetooth_devices
@ -118,7 +155,6 @@ class Computer:
"""Check if the computer is dualboot.""" """Check if the computer is dualboot."""
return self._dualboot return self._dualboot
# Actions
async def start(self) -> None: async def start(self) -> None:
"""Start the computer.""" """Start the computer."""
send_magic_packet(self.mac) send_magic_packet(self.mac)
@ -135,68 +171,80 @@ class Computer:
"""Put the computer to sleep.""" """Put the computer to sleep."""
await self.run_action("sleep") await self.run_action("sleep")
async def change_monitors_config(self, monitors_config: dict) -> None: async def change_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
"""Change the monitors configuration."""
# Implementation needed
pass pass
async def change_audio_config(self, volume: int | None = None, mute: bool | None = None, async def change_audio_config(self, volume: int | None = None, mute: bool | None = None,
input_device: str | None = None, input_device: str | None = None,
output_device: str | None = None) -> None: output_device: str | None = None) -> None:
"""Change the audio configuration."""
# Implementation needed
pass pass
async def install_nircmd(self) -> None: async def install_nircmd(self) -> None:
"""Install NirCmd tool (Windows specific)."""
# Implementation needed
pass pass
async def start_steam_big_picture(self) -> None: async def start_steam_big_picture(self) -> None:
"""Start Steam Big Picture mode."""
# Implementation needed
pass pass
async def stop_steam_big_picture(self) -> None: async def stop_steam_big_picture(self) -> None:
"""Stop Steam Big Picture mode."""
# Implementation needed
pass pass
async def exit_steam_big_picture(self) -> None: async def exit_steam_big_picture(self) -> None:
"""Exit Steam Big Picture mode."""
# Implementation needed
pass pass
async def run_action(self, action: str, params=None) -> dict: async def run_action(self, action: str, params: Dict[str, Any] = None, exit: bool = None) -> Dict[str, Any]:
"""Run a command via SSH. Opens a new connection for each command.""" """Run a predefined command via SSH."""
if params is None: if params is None:
params = {} params = {}
if action in const.ACTIONS: if action not in const.ACTIONS:
command_template = const.ACTIONS[action] return {"output": "", "error": "Action not found", "return_code": 1}
# Check if the command has the required parameters command_template = const.ACTIONS[action]
if "params" in command_template:
if sorted(command_template["params"]) != sorted(params.keys()):
raise ValueError("Invalid parameters")
# Check if the command is available for the operating system if "params" in command_template and sorted(command_template["params"]) != sorted(params.keys()):
if self._operating_system in command_template: raise ValueError(f"Invalid parameters for action: {action}")
match self._operating_system:
case OSType.WINDOWS:
commands = command_template[OSType.WINDOWS]
case OSType.LINUX:
commands = command_template[OSType.LINUX]
case _:
raise ValueError("Invalid operating system")
for command in commands: if "exit" in command_template and exit is None:
# Replace the parameters in the command exit = command_template["exit"]
for param, value in params.items():
command = command.replace(f"%{param}%", value)
result = await self.run_manually(command) commands = command_template.get(self._operating_system)
if not commands:
raise ValueError(f"Action not supported for OS: {self._operating_system}")
if result['return_code'] == 0: result = None
_LOGGER.debug(f"Command successful: {command}") for command in commands:
return result for param, value in params.items():
else: command = command.replace(f"%{param}%", value)
_LOGGER.debug(f"Command failed: {command}")
return {"output": "", "error": "", "return_code": 1} result = await self.run_manually(command)
if result['return_code'] == 0:
LOGGER.debug(f"Command successful: {command}")
return result
async def run_manually(self, command: str) -> dict: LOGGER.debug(f"Command failed: {command}")
"""Run a command manually (not from predefined commands)."""
if exit:
raise ValueError(f"Failed to run action: {action}")
return result
async def run_manually(self, command: str) -> Dict[str, Any]:
"""Run a custom command manually via SSH."""
if not self._connection:
await self._connect()
result = await self._connection.run(command) result = await self._connection.run(command)
return {"output": result.stdout, "error": result.stderr, return {"output": result.stdout, "error": result.stderr, "return_code": result.exit_status}
"return_code": result.exit_status}

View File

@ -1,9 +1,9 @@
import re import re
from custom_components.easy_computer_manager.computer import Computer from custom_components.easy_computer_manager import LOGGER
async def format_debug_informations(computer: Computer): async def format_debug_informations(computer: 'Computer'): # importing Computer causes circular import (how to fix?)
"""Return debug information about the host system.""" """Return debug information about the host system."""
data = { data = {
@ -33,8 +33,18 @@ async def format_debug_informations(computer: Computer):
return data return data
def parse_gnome_monitors_config(config: str): def parse_gnome_monitors_config(config: str) -> list:
"""Parse the GNOME monitors configuration.""" """
Parse the GNOME monitors configuration.
:param config:
The output of the gnome-monitor-config list command.
:type config: str
:returns: list
The parsed monitors configuration.
"""
monitors = [] monitors = []
current_monitor = None current_monitor = None
@ -81,3 +91,113 @@ def parse_gnome_monitors_config(config: str):
monitors.append(current_monitor) monitors.append(current_monitor)
return monitors return monitors
def parse_pactl_audio_config(config_speakers: str, config_microphones: str) -> dict[str, list]:
"""
Parse the pactl audio configuration.
:param config_speakers:
The output of the pactl list sinks command.
:param config_microphones:
The output of the pactl list sources command.
:type config_speakers: str
:type config_microphones: str
:returns: dict
The parsed audio configuration.
"""
config = {'speakers': [], 'microphones': []}
def parse_device_info(lines, device_type):
devices = []
current_device = {}
for line in lines:
if line.startswith(f"{device_type} #"):
if current_device and "Monitor" not in current_device['description']:
devices.append(current_device)
current_device = {'id': int(re.search(r'#(\d+)', line).group(1))}
elif line.startswith(" Name:"):
current_device['name'] = line.split(":")[1].strip()
elif line.startswith(" State:"):
current_device['state'] = line.split(":")[1].strip()
elif line.startswith(" Description:"):
current_device['description'] = line.split(":")[1].strip()
if current_device:
devices.append(current_device)
return devices
config['speakers'] = parse_device_info(config_speakers.split('\n'), 'Sink')
config['microphones'] = parse_device_info(config_microphones.split('\n'), 'Source')
return config
def parse_bluetoothctl(command: dict, connected_devices_only: bool = True,
return_as_string: bool = False) -> list | str:
"""Parse the bluetoothctl info command.
:param command:
The command output.
:param connected_devices_only:
Only return connected devices.
Will return all devices, connected or not, if False.
:param return_as_string:
Return the devices as a string (i.e. for device info in the UI).
:type command: dict
:type connected_devices_only: bool
:type return_as_string: bool
:returns: str | list
The parsed bluetooth devices.
"""
if command.get('return_code') != 0:
if command.get("output").__contains__("Missing device address argument"): # Means no devices are connected
return "" if return_as_string else []
else:
LOGGER.warning(f"Cannot retrieve bluetooth devices, make sure bluetoothctl is installed")
return "" if return_as_string else []
devices = []
current_device = None
for line in command.get("output").split('\n'):
if line.startswith('Device'):
if current_device is not None:
devices.append({
"address": current_device,
"name": current_name,
"connected": current_connected
})
current_device = line.split()[1]
current_name = None
current_connected = None
elif 'Name:' in line:
current_name = line.split(': ', 1)[1]
elif 'Connected:' in line:
current_connected = line.split(': ')[1] == 'yes'
# Add the last device if any
if current_device is not None:
devices.append({
"address": current_device,
"name": current_name,
"connected": current_connected
})
if connected_devices_only:
devices = [device for device in devices if device["connected"] == "yes"]
if return_as_string:
devices = "; ".join([f"{device['name']} ({device['address']})" for device in devices])
return devices

View File

@ -45,10 +45,10 @@ ACTIONS = {
"linux": ["gnome-monitor-config list"] "linux": ["gnome-monitor-config list"]
}, },
"get_speakers": { "get_speakers": {
"linux": ["pactl list sinks"] "linux": ["LANG=en_US.UTF-8 pactl list sinks"]
}, },
"get_microphones": { "get_microphones": {
"linux": ["pactl list sources"] "linux": ["LANG=en_US.UTF-8 pactl list sources"]
}, },
"get_bluetooth_devices": { "get_bluetooth_devices": {
"exit": False, "exit": False,