implement monitor config, rework of commands def and other fixes/improvements

This commit is contained in:
Mathieu Broillet 2024-08-26 22:14:00 +02:00
parent 8f2f03f134
commit 3e13d1ddb7
Signed by: mathieu
GPG Key ID: A08E484FE95074C1
6 changed files with 323 additions and 266 deletions

View File

@ -7,8 +7,9 @@ from wakeonlan import send_magic_packet
from custom_components.easy_computer_manager import const, LOGGER from custom_components.easy_computer_manager import const, LOGGER
from custom_components.easy_computer_manager.computer.common import OSType, CommandOutput from custom_components.easy_computer_manager.computer.common import OSType, CommandOutput
from custom_components.easy_computer_manager.computer.utils import parse_gnome_monitors_output, \ from custom_components.easy_computer_manager.computer.formatter import format_gnome_monitors_args
parse_pactl_output, parse_bluetoothctl from custom_components.easy_computer_manager.computer.parser import parse_gnome_monitors_output, parse_pactl_output, \
parse_bluetoothctl
class Computer: class Computer:
@ -66,7 +67,7 @@ class Computer:
self.operating_system_version = (await self.run_action("operating_system_version")).output self.operating_system_version = (await self.run_action("operating_system_version")).output
async def update_desktop_environment(): async def update_desktop_environment():
self.desktop_environment = (await self.run_action("desktop_environment")).output self.desktop_environment = (await self.run_action("desktop_environment")).output.lower()
async def update_windows_entry_grub(): async def update_windows_entry_grub():
self.windows_entry_grub = (await self.run_action("get_windows_entry_grub")).output self.windows_entry_grub = (await self.run_action("get_windows_entry_grub")).output
@ -111,6 +112,14 @@ class Computer:
uname_result = await self.run_manually("uname") uname_result = await self.run_manually("uname")
return OSType.LINUX if uname_result.successful() else OSType.WINDOWS return OSType.LINUX if uname_result.successful() else OSType.WINDOWS
def is_windows(self) -> bool:
"""Check if the computer is running Windows."""
return self.operating_system == OSType.WINDOWS
def is_linux(self) -> bool:
"""Check if the computer is running Linux."""
return self.operating_system == OSType.LINUX
async def is_on(self, timeout: int = 1) -> bool: async def is_on(self, timeout: int = 1) -> bool:
"""Check if the computer is on by pinging it.""" """Check if the computer is on by pinging it."""
ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self.host)] ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self.host)]
@ -138,14 +147,17 @@ class Computer:
"""Put the computer to sleep.""" """Put the computer to sleep."""
await self.run_action("sleep") await self.run_action("sleep")
async def change_monitors_config(self, monitors_config: Dict[str, Any]) -> None: async def set_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
"""Change the monitors configuration.""" """Change the monitors configuration."""
# Implementation needed if self.is_linux():
pass # TODO: other DE support
if self.desktop_environment == 'gnome':
await self.run_action("set_monitors_config",
params={"args": format_gnome_monitors_args(monitors_config)})
async def change_audio_config(self, volume: int | None = None, mute: bool | None = None, async def set_audio_config(self, volume: int | None = None, mute: bool | None = None,
input_device: str | None = None, input_device: str | None = None,
output_device: str | None = None) -> None: output_device: str | None = None) -> None:
"""Change the audio configuration.""" """Change the audio configuration."""
# Implementation needed # Implementation needed
pass pass
@ -155,57 +167,63 @@ class Computer:
# Implementation needed # Implementation needed
pass pass
async def start_steam_big_picture(self) -> None: async def steam_big_picture(self, action: str) -> None:
"""Start Steam Big Picture mode.""" """Start, stop or exit Steam Big Picture mode."""
# Implementation needed # Implementation needed
pass pass
async def stop_steam_big_picture(self) -> None: async def run_action(self, id: str, params=None, raise_on_error: bool = None) -> CommandOutput:
"""Stop Steam Big Picture mode."""
# Implementation needed
pass
async def exit_steam_big_picture(self) -> None:
"""Exit Steam Big Picture mode."""
# Implementation needed
pass
async def run_action(self, action: str, params: Dict[str, Any] = None, exit: bool = None) -> CommandOutput:
"""Run a predefined command via SSH.""" """Run a predefined command via SSH."""
if params is None: if params is None:
params = {} params = {}
if id not in const.ACTIONS:
if action not in const.ACTIONS:
return CommandOutput("", 1, "", "Action not found") return CommandOutput("", 1, "", "Action not found")
command_template = const.ACTIONS[action] action = const.ACTIONS[id]
if "params" in command_template and sorted(command_template["params"]) != sorted(params.keys()): if self.operating_system.lower() in action:
raise ValueError(f"Invalid parameters for action: {action}") raise_on_error = action.get("raise_on_error", raise_on_error)
if "exit" in command_template and exit is None: os_data = action.get(self.operating_system.lower())
exit = command_template["exit"] if isinstance(os_data, list):
commands = os_data
req_params = []
elif isinstance(os_data, dict):
if "command" in os_data or "commands" in os_data:
commands = os_data.get("commands", [os_data.get("command")])
req_params = os_data.get("params", [])
raise_on_error = os_data.get("raise_on_error", raise_on_error)
elif self.desktop_environment in os_data:
commands = os_data.get(self.desktop_environment).get("commands", [
os_data.get(self.desktop_environment).get("command")])
req_params = os_data.get(self.desktop_environment).get("params", [])
raise_on_error = os_data.get(self.desktop_environment).get("raise_on_error", raise_on_error)
else:
raise ValueError(f"Action {id} not supported for DE: {self.desktop_environment}")
else:
raise ValueError(f"Action {id} misconfigured/bad format")
commands = command_template.get(self.operating_system.lower()) if sorted(req_params) != sorted(params.keys()):
if not commands: raise ValueError(f"Invalid/missing parameters for action: {id}")
raise ValueError(f"Action not supported for OS: {self.operating_system}")
command_result = None command_result = None
for command in commands: for command in commands:
for param, value in params.items(): for param, value in params.items():
command = command.replace(f"%{param}%", value) command = command.replace(f"%{param}%", value)
command_result = await self.run_manually(command) command_result = await self.run_manually(command)
if command_result.successful(): if command_result.successful():
LOGGER.debug(f"Command successful: {command}") LOGGER.debug(f"Command successful: {command}")
return command_result return command_result
else:
LOGGER.debug(f"Command failed (raise: {raise_on_error}) : {command}")
if raise_on_error:
raise ValueError(f"Command failed: {command}")
LOGGER.debug(f"Command failed: {command}") return command_result
if exit: else:
raise ValueError(f"Failed to run action: {action}") raise ValueError(f"Action {id} not supported for OS: {self.operating_system}")
return command_result
async def run_manually(self, command: str) -> CommandOutput: async def run_manually(self, command: str) -> CommandOutput:
"""Run a custom command manually via SSH.""" """Run a custom command manually via SSH."""

View File

@ -0,0 +1,22 @@
def format_gnome_monitors_args(monitors_config: dict):
args = []
monitors_config = monitors_config.get('monitors_config', {})
for monitor, settings in monitors_config.items():
if settings.get('enabled', False):
args.extend(['-LpM' if settings.get('primary', False) else '-LM', monitor])
if 'position' in settings:
args.extend(['-x', str(settings["position"][0]), '-y', str(settings["position"][1])])
if 'mode' in settings:
args.extend(['-m', settings["mode"]])
if 'scale' in settings:
args.extend(['-s', str(settings["scale"])])
if 'transform' in settings:
args.extend(['-t', settings["transform"]])
return ' '.join(args)

View File

@ -0,0 +1,168 @@
import re
from custom_components.easy_computer_manager import LOGGER
from custom_components.easy_computer_manager.computer import CommandOutput
def parse_gnome_monitors_output(config: str) -> list:
"""
Parse the GNOME monitors configuration.
:param config:
The output of the gnome-monitor-config list command.
:type config: str
:returns: list
The parsed monitors configuration.
"""
monitors = []
current_monitor = None
for line in config.split('\n'):
monitor_match = re.match(r'^Monitor \[ (.+?) \] (ON|OFF)$', line)
if monitor_match:
if current_monitor:
monitors.append(current_monitor)
source, status = monitor_match.groups()
current_monitor = {'source': source, 'status': status, 'names': [], 'resolutions': []}
elif current_monitor:
display_name_match = re.match(r'^\s+display-name: (.+)$', line)
resolution_match = re.match(r'^\s+(\d+x\d+@\d+(?:\.\d+)?).*$', line)
if display_name_match:
current_monitor['names'].append(display_name_match.group(1).replace('"', ''))
elif resolution_match:
# Don't include resolutions under 1280x720
if int(resolution_match.group(1).split('@')[0].split('x')[0]) >= 1280:
# If there are already resolutions in the list, check if the framerate between the last is >1
if len(current_monitor['resolutions']) > 0:
last_resolution = current_monitor['resolutions'][-1]
last_resolution_size = last_resolution.split('@')[0]
this_resolution_size = resolution_match.group(1).split('@')[0]
# Only truncate some framerates if the resolution are the same
if last_resolution_size == this_resolution_size:
last_resolution_framerate = float(last_resolution.split('@')[1])
this_resolution_framerate = float(resolution_match.group(1).split('@')[1])
# If the difference between the last resolution framerate and this one is >1, ignore it
if last_resolution_framerate - 1 > this_resolution_framerate:
current_monitor['resolutions'].append(resolution_match.group(1))
else:
# If the resolution is different, this adds the new resolution
# to the list without truncating
current_monitor['resolutions'].append(resolution_match.group(1))
else:
# This is the first resolution, add it to the list
current_monitor['resolutions'].append(resolution_match.group(1))
if current_monitor:
monitors.append(current_monitor)
return monitors
def parse_pactl_output(config_speakers: str, config_microphones: str) -> dict[str, list]:
"""
Parse the pactl audio configuration.
:param config_speakers:
The output of the pactl list sinks command.
:param config_microphones:
The output of the pactl list sources command.
:type config_speakers: str
:type config_microphones: str
:returns: dict
The parsed audio configuration.
"""
config = {'speakers': [], 'microphones': []}
def parse_device_info(lines, device_type):
devices = []
current_device = {}
for line in lines:
if line.startswith(f"{device_type} #"):
if current_device and "Monitor" not in current_device['description']:
devices.append(current_device)
current_device = {'id': int(re.search(r'#(\d+)', line).group(1))}
elif line.startswith(" Name:"):
current_device['name'] = line.split(":")[1].strip()
elif line.startswith(" State:"):
current_device['state'] = line.split(":")[1].strip()
elif line.startswith(" Description:"):
current_device['description'] = line.split(":")[1].strip()
if current_device:
devices.append(current_device)
return devices
config['speakers'] = parse_device_info(config_speakers.split('\n'), 'Sink')
config['microphones'] = parse_device_info(config_microphones.split('\n'), 'Source')
return config
def parse_bluetoothctl(command: CommandOutput, connected_devices_only: bool = True,
return_as_string: bool = False) -> list | str:
"""Parse the bluetoothctl info command.
:param command:
The command output.
:param connected_devices_only:
Only return connected devices.
Will return all devices, connected or not, if False.
:type command: :class: CommandOutput
:type connected_devices_only: bool
:returns: str | list
The parsed bluetooth devices.
"""
if not command.successful():
if command.output.__contains__("Missing device address argument"): # Means no devices are connected
return "" if return_as_string else []
else:
LOGGER.warning(f"Cannot retrieve bluetooth devices, make sure bluetoothctl is installed")
return "" if return_as_string else []
devices = []
current_device = None
for line in command.output.split('\n'):
if line.startswith('Device'):
if current_device is not None:
devices.append({
"address": current_device,
"name": current_name,
"connected": current_connected
})
current_device = line.split()[1]
current_name = None
current_connected = None
elif 'Name:' in line:
current_name = line.split(': ', 1)[1]
elif 'Connected:' in line:
current_connected = line.split(': ')[1] == 'yes'
# Add the last device if any
if current_device is not None:
devices.append({
"address": current_device,
"name": current_name,
"connected": current_connected
})
if connected_devices_only:
devices = [device for device in devices if device["connected"] == True]
return devices

View File

@ -1,9 +1,3 @@
import re
from custom_components.easy_computer_manager import LOGGER
from custom_components.easy_computer_manager.computer.common import CommandOutput
async def format_debug_information(computer: 'Computer'): # importing Computer causes circular import (how to fix?) async def format_debug_information(computer: 'Computer'): # importing Computer causes circular import (how to fix?)
"""Return debug information about the host system.""" """Return debug information about the host system."""
@ -35,165 +29,11 @@ async def format_debug_information(computer: 'Computer'): # importing Computer
return data return data
def parse_gnome_monitors_output(config: str) -> list: def get_bluetooth_devices_as_str(computer: 'Computer') -> str:
""" """Return the bluetooth devices as a string."""
Parse the GNOME monitors configuration. devices = computer.bluetooth_devices
:param config: if not devices:
The output of the gnome-monitor-config list command. return ""
:type config: str return "; ".join([f"{device['name']} ({device['address']})" for device in devices])
:returns: list
The parsed monitors configuration.
"""
monitors = []
current_monitor = None
for line in config.split('\n'):
monitor_match = re.match(r'^Monitor \[ (.+?) \] (ON|OFF)$', line)
if monitor_match:
if current_monitor:
monitors.append(current_monitor)
source, status = monitor_match.groups()
current_monitor = {'source': source, 'status': status, 'names': [], 'resolutions': []}
elif current_monitor:
display_name_match = re.match(r'^\s+display-name: (.+)$', line)
resolution_match = re.match(r'^\s+(\d+x\d+@\d+(?:\.\d+)?).*$', line)
if display_name_match:
current_monitor['names'].append(display_name_match.group(1).replace('"', ''))
elif resolution_match:
# Don't include resolutions under 1280x720
if int(resolution_match.group(1).split('@')[0].split('x')[0]) >= 1280:
# If there are already resolutions in the list, check if the framerate between the last is >1
if len(current_monitor['resolutions']) > 0:
last_resolution = current_monitor['resolutions'][-1]
last_resolution_size = last_resolution.split('@')[0]
this_resolution_size = resolution_match.group(1).split('@')[0]
# Only truncate some framerates if the resolution are the same
if last_resolution_size == this_resolution_size:
last_resolution_framerate = float(last_resolution.split('@')[1])
this_resolution_framerate = float(resolution_match.group(1).split('@')[1])
# If the difference between the last resolution framerate and this one is >1, ignore it
if last_resolution_framerate - 1 > this_resolution_framerate:
current_monitor['resolutions'].append(resolution_match.group(1))
else:
# If the resolution is different, this adds the new resolution
# to the list without truncating
current_monitor['resolutions'].append(resolution_match.group(1))
else:
# This is the first resolution, add it to the list
current_monitor['resolutions'].append(resolution_match.group(1))
if current_monitor:
monitors.append(current_monitor)
return monitors
def parse_pactl_output(config_speakers: str, config_microphones: str) -> dict[str, list]:
"""
Parse the pactl audio configuration.
:param config_speakers:
The output of the pactl list sinks command.
:param config_microphones:
The output of the pactl list sources command.
:type config_speakers: str
:type config_microphones: str
:returns: dict
The parsed audio configuration.
"""
config = {'speakers': [], 'microphones': []}
def parse_device_info(lines, device_type):
devices = []
current_device = {}
for line in lines:
if line.startswith(f"{device_type} #"):
if current_device and "Monitor" not in current_device['description']:
devices.append(current_device)
current_device = {'id': int(re.search(r'#(\d+)', line).group(1))}
elif line.startswith(" Name:"):
current_device['name'] = line.split(":")[1].strip()
elif line.startswith(" State:"):
current_device['state'] = line.split(":")[1].strip()
elif line.startswith(" Description:"):
current_device['description'] = line.split(":")[1].strip()
if current_device:
devices.append(current_device)
return devices
config['speakers'] = parse_device_info(config_speakers.split('\n'), 'Sink')
config['microphones'] = parse_device_info(config_microphones.split('\n'), 'Source')
return config
def parse_bluetoothctl(command: CommandOutput, connected_devices_only: bool = True,
return_as_string: bool = False) -> list | str:
"""Parse the bluetoothctl info command.
:param command:
The command output.
:param connected_devices_only:
Only return connected devices.
Will return all devices, connected or not, if False.
:type command: :class: CommandOutput
:type connected_devices_only: bool
:returns: str | list
The parsed bluetooth devices.
"""
if not command.successful():
if command.output.__contains__("Missing device address argument"): # Means no devices are connected
return "" if return_as_string else []
else:
LOGGER.warning(f"Cannot retrieve bluetooth devices, make sure bluetoothctl is installed")
return "" if return_as_string else []
devices = []
current_device = None
for line in command.output.split('\n'):
if line.startswith('Device'):
if current_device is not None:
devices.append({
"address": current_device,
"name": current_name,
"connected": current_connected
})
current_device = line.split()[1]
current_name = None
current_connected = None
elif 'Name:' in line:
current_name = line.split(': ', 1)[1]
elif 'Connected:' in line:
current_connected = line.split(': ')[1] == 'yes'
# Add the last device if any
if current_device is not None:
devices.append({
"address": current_device,
"name": current_name,
"connected": current_connected
})
if connected_devices_only:
devices = [device for device in devices if device["connected"] == "yes"]
return devices

View File

@ -42,12 +42,22 @@ ACTIONS = {
"sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub2/grub.cfg"] "sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub2/grub.cfg"]
}, },
"set_grub_entry": { "set_grub_entry": {
"params": ["grub-entry"], "linux": {
"linux": ["sudo grub-reboot %grub-entry%", "sudo grub2-reboot %grub-entry%"] "commands": ["sudo grub-reboot %grub-entry%", "sudo grub2-reboot %grub-entry%"],
"params": ["grub-entry"],
}
}, },
"get_monitors_config": { "get_monitors_config": {
"linux": ["gnome-monitor-config list"] "linux": ["gnome-monitor-config list"]
}, },
"set_monitors_config": {
"linux": {
"gnome": {
"command": "gnome-monitor-config set %args%",
"params": ["args"]
}
}
},
"get_speakers": { "get_speakers": {
"linux": ["LANG=en_US.UTF-8 pactl list sinks"] "linux": ["LANG=en_US.UTF-8 pactl list sinks"]
}, },
@ -55,7 +65,9 @@ ACTIONS = {
"linux": ["LANG=en_US.UTF-8 pactl list sources"] "linux": ["LANG=en_US.UTF-8 pactl list sources"]
}, },
"get_bluetooth_devices": { "get_bluetooth_devices": {
"exit": False, "linux": {
"linux": ["bluetoothctl info"] "command": "bluetoothctl info",
"raise_on_error": False,
}
} }
} }

View File

@ -25,7 +25,7 @@ from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .computer import OSType, Computer from .computer import OSType, Computer
from .computer.utils import format_debug_information from .computer.utils import format_debug_information, get_bluetooth_devices_as_str
from .const import SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP, \ from .const import SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP, \
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \ SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \
SERVICE_CHANGE_MONITORS_CONFIG, SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO, DOMAIN SERVICE_CHANGE_MONITORS_CONFIG, SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO, DOMAIN
@ -160,55 +160,6 @@ class ComputerSwitch(SwitchEntity):
self._state = False self._state = False
self.async_write_ha_state() self.async_write_ha_state()
# Services
async def restart_to_windows_from_linux(self) -> None:
"""Restart the computer to Windows from a running Linux by setting grub-reboot and restarting."""
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
async def restart_to_linux_from_windows(self) -> None:
"""Restart the computer to Linux from a running Windows by setting grub-reboot and restarting."""
await self.computer.restart()
async def put_computer_to_sleep(self) -> None:
await self.computer.put_to_sleep()
async def start_computer_to_windows(self) -> None:
async def wait_task():
while not self.is_on:
await asyncio.sleep(3)
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
"""Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart."""
await self.computer.start()
self._hass.loop.create_task(wait_task())
async def restart_computer(self) -> None:
await self.computer.restart()
async def change_monitors_config(self, monitors_config: dict | None = None) -> None:
"""Change the monitors configuration using a YAML config file."""
await self.computer.change_monitors_config(monitors_config)
async def steam_big_picture(self, action: str) -> None:
match action:
case "start":
await self.computer.start_steam_big_picture()
case "stop":
await self.computer.stop_steam_big_picture()
case "exit":
await self.computer.exit_steam_big_picture()
async def change_audio_config(self, volume: int | None = None, mute: bool | None = None,
input_device: str | None = None,
output_device: str | None = None) -> None:
"""Change the audio configuration using a YAML config file."""
await self.computer.change_audio_config(volume, mute, input_device, output_device)
async def debug_info(self) -> ServiceResponse:
"""Prints debug info."""
return await format_debug_information(self.computer)
async def async_update(self) -> None: async def async_update(self) -> None:
"""Ping the computer to see if it is online and update the state.""" """Ping the computer to see if it is online and update the state."""
self._state = await self.computer.is_on() self._state = await self.computer.is_on()
@ -222,6 +173,52 @@ class ComputerSwitch(SwitchEntity):
"operating_system_version": self.computer.operating_system_version, "operating_system_version": self.computer.operating_system_version,
"mac_address": self.computer.mac, "mac_address": self.computer.mac,
"ip_address": self.computer.host, "ip_address": self.computer.host,
"connected_devices": self.computer.bluetooth_devices, "connected_devices": get_bluetooth_devices_as_str(self.computer),
} }
# Services
async def restart_to_windows_from_linux(self) -> None:
"""(Service Handler) Restart the computer to Windows from a running Linux by setting grub-reboot and restarting."""
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
async def restart_to_linux_from_windows(self) -> None:
"""(Service Handler) Restart the computer to Linux from a running Windows by setting grub-reboot and restarting."""
await self.computer.restart()
async def put_computer_to_sleep(self) -> None:
"""(Service Handler) Put the computer to sleep."""
await self.computer.put_to_sleep()
async def start_computer_to_windows(self) -> None:
"""(Service Handler) Start the computer to Windows"""
async def wait_task():
while not self.is_on:
await asyncio.sleep(3)
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
"""Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart."""
await self.computer.start()
self._hass.loop.create_task(wait_task())
async def restart_computer(self) -> None:
"""(Service Handler) Restart the computer."""
await self.computer.restart()
async def change_monitors_config(self, monitors_config: dict | None = None) -> None:
"""(Service Handler) Change the monitors configuration using a YAML config file."""
await self.computer.set_monitors_config(monitors_config)
async def steam_big_picture(self, action: str) -> None:
"""(Service Handler) Control Steam Big Picture mode."""
await self.computer.steam_big_picture(action)
async def change_audio_config(self, volume: int | None = None, mute: bool | None = None,
input_device: str | None = None,
output_device: str | None = None) -> None:
"""(Service Handler) Change the audio configuration using a YAML config file."""
await self.computer.set_audio_config(volume, mute, input_device, output_device)
async def debug_info(self) -> ServiceResponse:
"""(Service Handler) Prints debug info."""
return await format_debug_information(self.computer)