diff --git a/custom_components/easy_computer_manager/computer/__init__.py b/custom_components/easy_computer_manager/computer/__init__.py index 1b9926c..7e01790 100644 --- a/custom_components/easy_computer_manager/computer/__init__.py +++ b/custom_components/easy_computer_manager/computer/__init__.py @@ -7,8 +7,9 @@ from wakeonlan import send_magic_packet from custom_components.easy_computer_manager import const, LOGGER from custom_components.easy_computer_manager.computer.common import OSType, CommandOutput -from custom_components.easy_computer_manager.computer.utils import parse_gnome_monitors_output, \ - parse_pactl_output, parse_bluetoothctl +from custom_components.easy_computer_manager.computer.formatter import format_gnome_monitors_args +from custom_components.easy_computer_manager.computer.parser import parse_gnome_monitors_output, parse_pactl_output, \ + parse_bluetoothctl class Computer: @@ -66,7 +67,7 @@ class Computer: self.operating_system_version = (await self.run_action("operating_system_version")).output async def update_desktop_environment(): - self.desktop_environment = (await self.run_action("desktop_environment")).output + self.desktop_environment = (await self.run_action("desktop_environment")).output.lower() async def update_windows_entry_grub(): self.windows_entry_grub = (await self.run_action("get_windows_entry_grub")).output @@ -111,6 +112,14 @@ class Computer: uname_result = await self.run_manually("uname") return OSType.LINUX if uname_result.successful() else OSType.WINDOWS + def is_windows(self) -> bool: + """Check if the computer is running Windows.""" + return self.operating_system == OSType.WINDOWS + + def is_linux(self) -> bool: + """Check if the computer is running Linux.""" + return self.operating_system == OSType.LINUX + async def is_on(self, timeout: int = 1) -> bool: """Check if the computer is on by pinging it.""" ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self.host)] @@ -138,14 +147,17 @@ class Computer: """Put the computer to sleep.""" await self.run_action("sleep") - async def change_monitors_config(self, monitors_config: Dict[str, Any]) -> None: + async def set_monitors_config(self, monitors_config: Dict[str, Any]) -> None: """Change the monitors configuration.""" - # Implementation needed - pass + if self.is_linux(): + # TODO: other DE support + if self.desktop_environment == 'gnome': + await self.run_action("set_monitors_config", + params={"args": format_gnome_monitors_args(monitors_config)}) - async def change_audio_config(self, volume: int | None = None, mute: bool | None = None, - input_device: str | None = None, - output_device: str | None = None) -> None: + async def set_audio_config(self, volume: int | None = None, mute: bool | None = None, + input_device: str | None = None, + output_device: str | None = None) -> None: """Change the audio configuration.""" # Implementation needed pass @@ -155,57 +167,63 @@ class Computer: # Implementation needed pass - async def start_steam_big_picture(self) -> None: - """Start Steam Big Picture mode.""" + async def steam_big_picture(self, action: str) -> None: + """Start, stop or exit Steam Big Picture mode.""" # Implementation needed pass - async def stop_steam_big_picture(self) -> None: - """Stop Steam Big Picture mode.""" - # Implementation needed - pass - - async def exit_steam_big_picture(self) -> None: - """Exit Steam Big Picture mode.""" - # Implementation needed - pass - - async def run_action(self, action: str, params: Dict[str, Any] = None, exit: bool = None) -> CommandOutput: + async def run_action(self, id: str, params=None, raise_on_error: bool = None) -> CommandOutput: """Run a predefined command via SSH.""" if params is None: params = {} - - if action not in const.ACTIONS: + if id not in const.ACTIONS: return CommandOutput("", 1, "", "Action not found") - command_template = const.ACTIONS[action] + action = const.ACTIONS[id] - if "params" in command_template and sorted(command_template["params"]) != sorted(params.keys()): - raise ValueError(f"Invalid parameters for action: {action}") + if self.operating_system.lower() in action: + raise_on_error = action.get("raise_on_error", raise_on_error) - if "exit" in command_template and exit is None: - exit = command_template["exit"] + os_data = action.get(self.operating_system.lower()) + if isinstance(os_data, list): + commands = os_data + req_params = [] + elif isinstance(os_data, dict): + if "command" in os_data or "commands" in os_data: + commands = os_data.get("commands", [os_data.get("command")]) + req_params = os_data.get("params", []) + raise_on_error = os_data.get("raise_on_error", raise_on_error) + elif self.desktop_environment in os_data: + commands = os_data.get(self.desktop_environment).get("commands", [ + os_data.get(self.desktop_environment).get("command")]) + req_params = os_data.get(self.desktop_environment).get("params", []) + raise_on_error = os_data.get(self.desktop_environment).get("raise_on_error", raise_on_error) + else: + raise ValueError(f"Action {id} not supported for DE: {self.desktop_environment}") + else: + raise ValueError(f"Action {id} misconfigured/bad format") - commands = command_template.get(self.operating_system.lower()) - if not commands: - raise ValueError(f"Action not supported for OS: {self.operating_system}") + if sorted(req_params) != sorted(params.keys()): + raise ValueError(f"Invalid/missing parameters for action: {id}") - command_result = None - for command in commands: - for param, value in params.items(): - command = command.replace(f"%{param}%", value) + command_result = None + for command in commands: + for param, value in params.items(): + command = command.replace(f"%{param}%", value) - command_result = await self.run_manually(command) - if command_result.successful(): - LOGGER.debug(f"Command successful: {command}") - return command_result + command_result = await self.run_manually(command) + if command_result.successful(): + LOGGER.debug(f"Command successful: {command}") + return command_result + else: + LOGGER.debug(f"Command failed (raise: {raise_on_error}) : {command}") + if raise_on_error: + raise ValueError(f"Command failed: {command}") - LOGGER.debug(f"Command failed: {command}") + return command_result - if exit: - raise ValueError(f"Failed to run action: {action}") - - return command_result + else: + raise ValueError(f"Action {id} not supported for OS: {self.operating_system}") async def run_manually(self, command: str) -> CommandOutput: """Run a custom command manually via SSH.""" diff --git a/custom_components/easy_computer_manager/computer/formatter.py b/custom_components/easy_computer_manager/computer/formatter.py new file mode 100644 index 0000000..2e0c559 --- /dev/null +++ b/custom_components/easy_computer_manager/computer/formatter.py @@ -0,0 +1,22 @@ +def format_gnome_monitors_args(monitors_config: dict): + args = [] + + monitors_config = monitors_config.get('monitors_config', {}) + + for monitor, settings in monitors_config.items(): + if settings.get('enabled', False): + args.extend(['-LpM' if settings.get('primary', False) else '-LM', monitor]) + + if 'position' in settings: + args.extend(['-x', str(settings["position"][0]), '-y', str(settings["position"][1])]) + + if 'mode' in settings: + args.extend(['-m', settings["mode"]]) + + if 'scale' in settings: + args.extend(['-s', str(settings["scale"])]) + + if 'transform' in settings: + args.extend(['-t', settings["transform"]]) + + return ' '.join(args) diff --git a/custom_components/easy_computer_manager/computer/parser.py b/custom_components/easy_computer_manager/computer/parser.py new file mode 100644 index 0000000..3444e19 --- /dev/null +++ b/custom_components/easy_computer_manager/computer/parser.py @@ -0,0 +1,168 @@ +import re + +from custom_components.easy_computer_manager import LOGGER +from custom_components.easy_computer_manager.computer import CommandOutput + + +def parse_gnome_monitors_output(config: str) -> list: + """ + Parse the GNOME monitors configuration. + + :param config: + The output of the gnome-monitor-config list command. + + :type config: str + + :returns: list + The parsed monitors configuration. + """ + + monitors = [] + current_monitor = None + + for line in config.split('\n'): + monitor_match = re.match(r'^Monitor \[ (.+?) \] (ON|OFF)$', line) + if monitor_match: + if current_monitor: + monitors.append(current_monitor) + source, status = monitor_match.groups() + current_monitor = {'source': source, 'status': status, 'names': [], 'resolutions': []} + elif current_monitor: + display_name_match = re.match(r'^\s+display-name: (.+)$', line) + resolution_match = re.match(r'^\s+(\d+x\d+@\d+(?:\.\d+)?).*$', line) + if display_name_match: + current_monitor['names'].append(display_name_match.group(1).replace('"', '')) + elif resolution_match: + # Don't include resolutions under 1280x720 + if int(resolution_match.group(1).split('@')[0].split('x')[0]) >= 1280: + + # If there are already resolutions in the list, check if the framerate between the last is >1 + if len(current_monitor['resolutions']) > 0: + last_resolution = current_monitor['resolutions'][-1] + last_resolution_size = last_resolution.split('@')[0] + this_resolution_size = resolution_match.group(1).split('@')[0] + + # Only truncate some framerates if the resolution are the same + if last_resolution_size == this_resolution_size: + last_resolution_framerate = float(last_resolution.split('@')[1]) + this_resolution_framerate = float(resolution_match.group(1).split('@')[1]) + + # If the difference between the last resolution framerate and this one is >1, ignore it + if last_resolution_framerate - 1 > this_resolution_framerate: + current_monitor['resolutions'].append(resolution_match.group(1)) + else: + # If the resolution is different, this adds the new resolution + # to the list without truncating + current_monitor['resolutions'].append(resolution_match.group(1)) + else: + # This is the first resolution, add it to the list + current_monitor['resolutions'].append(resolution_match.group(1)) + + if current_monitor: + monitors.append(current_monitor) + + return monitors + + +def parse_pactl_output(config_speakers: str, config_microphones: str) -> dict[str, list]: + """ + Parse the pactl audio configuration. + + :param config_speakers: + The output of the pactl list sinks command. + :param config_microphones: + The output of the pactl list sources command. + + :type config_speakers: str + :type config_microphones: str + + :returns: dict + The parsed audio configuration. + + """ + + config = {'speakers': [], 'microphones': []} + + def parse_device_info(lines, device_type): + devices = [] + current_device = {} + + for line in lines: + if line.startswith(f"{device_type} #"): + if current_device and "Monitor" not in current_device['description']: + devices.append(current_device) + current_device = {'id': int(re.search(r'#(\d+)', line).group(1))} + elif line.startswith(" Name:"): + current_device['name'] = line.split(":")[1].strip() + elif line.startswith(" State:"): + current_device['state'] = line.split(":")[1].strip() + elif line.startswith(" Description:"): + current_device['description'] = line.split(":")[1].strip() + + if current_device: + devices.append(current_device) + + return devices + + config['speakers'] = parse_device_info(config_speakers.split('\n'), 'Sink') + config['microphones'] = parse_device_info(config_microphones.split('\n'), 'Source') + + return config + + +def parse_bluetoothctl(command: CommandOutput, connected_devices_only: bool = True, + return_as_string: bool = False) -> list | str: + """Parse the bluetoothctl info command. + + :param command: + The command output. + :param connected_devices_only: + Only return connected devices. + Will return all devices, connected or not, if False. + + :type command: :class: CommandOutput + :type connected_devices_only: bool + + :returns: str | list + The parsed bluetooth devices. + + """ + + if not command.successful(): + if command.output.__contains__("Missing device address argument"): # Means no devices are connected + return "" if return_as_string else [] + else: + LOGGER.warning(f"Cannot retrieve bluetooth devices, make sure bluetoothctl is installed") + return "" if return_as_string else [] + + devices = [] + current_device = None + + for line in command.output.split('\n'): + if line.startswith('Device'): + if current_device is not None: + devices.append({ + "address": current_device, + "name": current_name, + "connected": current_connected + }) + current_device = line.split()[1] + current_name = None + current_connected = None + elif 'Name:' in line: + current_name = line.split(': ', 1)[1] + elif 'Connected:' in line: + current_connected = line.split(': ')[1] == 'yes' + + # Add the last device if any + if current_device is not None: + devices.append({ + "address": current_device, + "name": current_name, + "connected": current_connected + }) + + if connected_devices_only: + devices = [device for device in devices if device["connected"] == True] + + return devices diff --git a/custom_components/easy_computer_manager/computer/utils.py b/custom_components/easy_computer_manager/computer/utils.py index 8f3c468..28c14bd 100644 --- a/custom_components/easy_computer_manager/computer/utils.py +++ b/custom_components/easy_computer_manager/computer/utils.py @@ -1,9 +1,3 @@ -import re - -from custom_components.easy_computer_manager import LOGGER -from custom_components.easy_computer_manager.computer.common import CommandOutput - - async def format_debug_information(computer: 'Computer'): # importing Computer causes circular import (how to fix?) """Return debug information about the host system.""" @@ -35,165 +29,11 @@ async def format_debug_information(computer: 'Computer'): # importing Computer return data -def parse_gnome_monitors_output(config: str) -> list: - """ - Parse the GNOME monitors configuration. +def get_bluetooth_devices_as_str(computer: 'Computer') -> str: + """Return the bluetooth devices as a string.""" + devices = computer.bluetooth_devices - :param config: - The output of the gnome-monitor-config list command. + if not devices: + return "" - :type config: str - - :returns: list - The parsed monitors configuration. - """ - - monitors = [] - current_monitor = None - - for line in config.split('\n'): - monitor_match = re.match(r'^Monitor \[ (.+?) \] (ON|OFF)$', line) - if monitor_match: - if current_monitor: - monitors.append(current_monitor) - source, status = monitor_match.groups() - current_monitor = {'source': source, 'status': status, 'names': [], 'resolutions': []} - elif current_monitor: - display_name_match = re.match(r'^\s+display-name: (.+)$', line) - resolution_match = re.match(r'^\s+(\d+x\d+@\d+(?:\.\d+)?).*$', line) - if display_name_match: - current_monitor['names'].append(display_name_match.group(1).replace('"', '')) - elif resolution_match: - # Don't include resolutions under 1280x720 - if int(resolution_match.group(1).split('@')[0].split('x')[0]) >= 1280: - - # If there are already resolutions in the list, check if the framerate between the last is >1 - if len(current_monitor['resolutions']) > 0: - last_resolution = current_monitor['resolutions'][-1] - last_resolution_size = last_resolution.split('@')[0] - this_resolution_size = resolution_match.group(1).split('@')[0] - - # Only truncate some framerates if the resolution are the same - if last_resolution_size == this_resolution_size: - last_resolution_framerate = float(last_resolution.split('@')[1]) - this_resolution_framerate = float(resolution_match.group(1).split('@')[1]) - - # If the difference between the last resolution framerate and this one is >1, ignore it - if last_resolution_framerate - 1 > this_resolution_framerate: - current_monitor['resolutions'].append(resolution_match.group(1)) - else: - # If the resolution is different, this adds the new resolution - # to the list without truncating - current_monitor['resolutions'].append(resolution_match.group(1)) - else: - # This is the first resolution, add it to the list - current_monitor['resolutions'].append(resolution_match.group(1)) - - if current_monitor: - monitors.append(current_monitor) - - return monitors - - -def parse_pactl_output(config_speakers: str, config_microphones: str) -> dict[str, list]: - """ - Parse the pactl audio configuration. - - :param config_speakers: - The output of the pactl list sinks command. - :param config_microphones: - The output of the pactl list sources command. - - :type config_speakers: str - :type config_microphones: str - - :returns: dict - The parsed audio configuration. - - """ - - config = {'speakers': [], 'microphones': []} - - def parse_device_info(lines, device_type): - devices = [] - current_device = {} - - for line in lines: - if line.startswith(f"{device_type} #"): - if current_device and "Monitor" not in current_device['description']: - devices.append(current_device) - current_device = {'id': int(re.search(r'#(\d+)', line).group(1))} - elif line.startswith(" Name:"): - current_device['name'] = line.split(":")[1].strip() - elif line.startswith(" State:"): - current_device['state'] = line.split(":")[1].strip() - elif line.startswith(" Description:"): - current_device['description'] = line.split(":")[1].strip() - - if current_device: - devices.append(current_device) - - return devices - - config['speakers'] = parse_device_info(config_speakers.split('\n'), 'Sink') - config['microphones'] = parse_device_info(config_microphones.split('\n'), 'Source') - - return config - - -def parse_bluetoothctl(command: CommandOutput, connected_devices_only: bool = True, - return_as_string: bool = False) -> list | str: - """Parse the bluetoothctl info command. - - :param command: - The command output. - :param connected_devices_only: - Only return connected devices. - Will return all devices, connected or not, if False. - - :type command: :class: CommandOutput - :type connected_devices_only: bool - - :returns: str | list - The parsed bluetooth devices. - - """ - - if not command.successful(): - if command.output.__contains__("Missing device address argument"): # Means no devices are connected - return "" if return_as_string else [] - else: - LOGGER.warning(f"Cannot retrieve bluetooth devices, make sure bluetoothctl is installed") - return "" if return_as_string else [] - - devices = [] - current_device = None - - for line in command.output.split('\n'): - if line.startswith('Device'): - if current_device is not None: - devices.append({ - "address": current_device, - "name": current_name, - "connected": current_connected - }) - current_device = line.split()[1] - current_name = None - current_connected = None - elif 'Name:' in line: - current_name = line.split(': ', 1)[1] - elif 'Connected:' in line: - current_connected = line.split(': ')[1] == 'yes' - - # Add the last device if any - if current_device is not None: - devices.append({ - "address": current_device, - "name": current_name, - "connected": current_connected - }) - - if connected_devices_only: - devices = [device for device in devices if device["connected"] == "yes"] - - return devices + return "; ".join([f"{device['name']} ({device['address']})" for device in devices]) diff --git a/custom_components/easy_computer_manager/const.py b/custom_components/easy_computer_manager/const.py index f85ffe8..f7e8691 100644 --- a/custom_components/easy_computer_manager/const.py +++ b/custom_components/easy_computer_manager/const.py @@ -42,12 +42,22 @@ ACTIONS = { "sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub2/grub.cfg"] }, "set_grub_entry": { - "params": ["grub-entry"], - "linux": ["sudo grub-reboot %grub-entry%", "sudo grub2-reboot %grub-entry%"] + "linux": { + "commands": ["sudo grub-reboot %grub-entry%", "sudo grub2-reboot %grub-entry%"], + "params": ["grub-entry"], + } }, "get_monitors_config": { "linux": ["gnome-monitor-config list"] }, + "set_monitors_config": { + "linux": { + "gnome": { + "command": "gnome-monitor-config set %args%", + "params": ["args"] + } + } + }, "get_speakers": { "linux": ["LANG=en_US.UTF-8 pactl list sinks"] }, @@ -55,7 +65,9 @@ ACTIONS = { "linux": ["LANG=en_US.UTF-8 pactl list sources"] }, "get_bluetooth_devices": { - "exit": False, - "linux": ["bluetoothctl info"] + "linux": { + "command": "bluetoothctl info", + "raise_on_error": False, + } } } \ No newline at end of file diff --git a/custom_components/easy_computer_manager/switch.py b/custom_components/easy_computer_manager/switch.py index 33da57c..ee54c50 100644 --- a/custom_components/easy_computer_manager/switch.py +++ b/custom_components/easy_computer_manager/switch.py @@ -25,7 +25,7 @@ from homeassistant.helpers.device_registry import DeviceInfo from homeassistant.helpers.entity_platform import AddEntitiesCallback from .computer import OSType, Computer -from .computer.utils import format_debug_information +from .computer.utils import format_debug_information, get_bluetooth_devices_as_str from .const import SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP, \ SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \ SERVICE_CHANGE_MONITORS_CONFIG, SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO, DOMAIN @@ -160,55 +160,6 @@ class ComputerSwitch(SwitchEntity): self._state = False self.async_write_ha_state() - # Services - async def restart_to_windows_from_linux(self) -> None: - """Restart the computer to Windows from a running Linux by setting grub-reboot and restarting.""" - await self.computer.restart(OSType.LINUX, OSType.WINDOWS) - - async def restart_to_linux_from_windows(self) -> None: - """Restart the computer to Linux from a running Windows by setting grub-reboot and restarting.""" - await self.computer.restart() - - async def put_computer_to_sleep(self) -> None: - await self.computer.put_to_sleep() - - async def start_computer_to_windows(self) -> None: - async def wait_task(): - while not self.is_on: - await asyncio.sleep(3) - - await self.computer.restart(OSType.LINUX, OSType.WINDOWS) - - """Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart.""" - await self.computer.start() - self._hass.loop.create_task(wait_task()) - - async def restart_computer(self) -> None: - await self.computer.restart() - - async def change_monitors_config(self, monitors_config: dict | None = None) -> None: - """Change the monitors configuration using a YAML config file.""" - await self.computer.change_monitors_config(monitors_config) - - async def steam_big_picture(self, action: str) -> None: - match action: - case "start": - await self.computer.start_steam_big_picture() - case "stop": - await self.computer.stop_steam_big_picture() - case "exit": - await self.computer.exit_steam_big_picture() - - async def change_audio_config(self, volume: int | None = None, mute: bool | None = None, - input_device: str | None = None, - output_device: str | None = None) -> None: - """Change the audio configuration using a YAML config file.""" - await self.computer.change_audio_config(volume, mute, input_device, output_device) - - async def debug_info(self) -> ServiceResponse: - """Prints debug info.""" - return await format_debug_information(self.computer) - async def async_update(self) -> None: """Ping the computer to see if it is online and update the state.""" self._state = await self.computer.is_on() @@ -222,6 +173,52 @@ class ComputerSwitch(SwitchEntity): "operating_system_version": self.computer.operating_system_version, "mac_address": self.computer.mac, "ip_address": self.computer.host, - "connected_devices": self.computer.bluetooth_devices, + "connected_devices": get_bluetooth_devices_as_str(self.computer), } + # Services + async def restart_to_windows_from_linux(self) -> None: + """(Service Handler) Restart the computer to Windows from a running Linux by setting grub-reboot and restarting.""" + await self.computer.restart(OSType.LINUX, OSType.WINDOWS) + + async def restart_to_linux_from_windows(self) -> None: + """(Service Handler) Restart the computer to Linux from a running Windows by setting grub-reboot and restarting.""" + await self.computer.restart() + + async def put_computer_to_sleep(self) -> None: + """(Service Handler) Put the computer to sleep.""" + await self.computer.put_to_sleep() + + async def start_computer_to_windows(self) -> None: + """(Service Handler) Start the computer to Windows""" + async def wait_task(): + while not self.is_on: + await asyncio.sleep(3) + + await self.computer.restart(OSType.LINUX, OSType.WINDOWS) + + """Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart.""" + await self.computer.start() + self._hass.loop.create_task(wait_task()) + + async def restart_computer(self) -> None: + """(Service Handler) Restart the computer.""" + await self.computer.restart() + + async def change_monitors_config(self, monitors_config: dict | None = None) -> None: + """(Service Handler) Change the monitors configuration using a YAML config file.""" + await self.computer.set_monitors_config(monitors_config) + + async def steam_big_picture(self, action: str) -> None: + """(Service Handler) Control Steam Big Picture mode.""" + await self.computer.steam_big_picture(action) + + async def change_audio_config(self, volume: int | None = None, mute: bool | None = None, + input_device: str | None = None, + output_device: str | None = None) -> None: + """(Service Handler) Change the audio configuration using a YAML config file.""" + await self.computer.set_audio_config(volume, mute, input_device, output_device) + + async def debug_info(self) -> ServiceResponse: + """(Service Handler) Prints debug info.""" + return await format_debug_information(self.computer)