Compare commits
No commits in common. "3fc5fb948ff552272c6706a2def671b59ff97c9c" and "e67e29facc881082d7577d851434c52862945947" have entirely different histories.
3fc5fb948f
...
e67e29facc
@ -16,7 +16,7 @@ from homeassistant.core import HomeAssistant, ServiceCall
|
|||||||
|
|
||||||
from .const import DOMAIN, SERVICE_SEND_MAGIC_PACKET, SERVICE_CHANGE_MONITORS_CONFIG
|
from .const import DOMAIN, SERVICE_SEND_MAGIC_PACKET, SERVICE_CHANGE_MONITORS_CONFIG
|
||||||
|
|
||||||
LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA = vol.Schema({
|
WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA = vol.Schema({
|
||||||
vol.Required(CONF_MAC): cv.string,
|
vol.Required(CONF_MAC): cv.string,
|
||||||
@ -40,7 +40,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||||||
if broadcast_port is not None:
|
if broadcast_port is not None:
|
||||||
service_kwargs["port"] = broadcast_port
|
service_kwargs["port"] = broadcast_port
|
||||||
|
|
||||||
LOGGER.info(
|
_LOGGER.info(
|
||||||
"Sending magic packet to MAC %s (broadcast: %s, port: %s)",
|
"Sending magic packet to MAC %s (broadcast: %s, port: %s)",
|
||||||
mac_address,
|
mac_address,
|
||||||
broadcast_address,
|
broadcast_address,
|
||||||
|
@ -45,32 +45,27 @@ class Computer:
|
|||||||
port=self._port,
|
port=self._port,
|
||||||
known_hosts=None
|
known_hosts=None
|
||||||
)
|
)
|
||||||
# TODO: implement key auth client_keys=['my_ssh_key'] (mixed field detect if key or pass)
|
|
||||||
asyncssh.set_log_level("ERROR")
|
|
||||||
return client
|
return client
|
||||||
except (OSError, asyncssh.Error) as exc:
|
except (OSError, asyncssh.Error) as exc:
|
||||||
_LOGGER.error(f"SSH connection failed: {exc}")
|
_LOGGER.error(f"SSH connection failed: {exc}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def _close_ssh_connection(self) -> None:
|
async def _close_ssh_connection(self, client: asyncssh.SSHClientConnection) -> None:
|
||||||
"""Close the SSH connection."""
|
"""Close the SSH connection."""
|
||||||
if self._connection is not None:
|
client.close()
|
||||||
self._connection.close()
|
|
||||||
await self._connection.wait_closed()
|
|
||||||
|
|
||||||
async def update(self) -> None:
|
async def update(self) -> None:
|
||||||
"""Setup method that opens an SSH connection and runs setup commands asynchronously."""
|
"""Setup method that opens an SSH connection and runs setup commands asynchronously."""
|
||||||
|
|
||||||
# Open SSH connection or renew it if it is closed
|
# Open SSH connection or renew it if it is closed
|
||||||
if self._connection is None or self._connection.is_closed:
|
if self._connection is None or self._connection.is_closed:
|
||||||
await self._close_ssh_connection()
|
|
||||||
self._connection = await self._open_ssh_connection()
|
self._connection = await self._open_ssh_connection()
|
||||||
|
|
||||||
self._operating_system = OSType.LINUX if (await self.run_manually("uname")).get(
|
self._operating_system = OSType.LINUX if (await self.run_manually("uname")).get(
|
||||||
"return_code") == 0 else OSType.WINDOWS # TODO: improve this
|
"return_code") == 0 else OSType.WINDOWS # TODO: improve this
|
||||||
self._operating_system_version = (await self.run_action("operating_system_version")).get("output")
|
self._operating_system_version = (await self.run_action("operating_system_version")).get("output")
|
||||||
self._windows_entry_grub = (await self.run_action("get_windows_entry_grub")).get("output")
|
self._windows_entry_grub = (await self.run_action("get_windows_entry_grub")).get("output")
|
||||||
self._monitors_config = (await self.run_action("get_monitors_config")).get("output")
|
self._monitors_config = {}
|
||||||
self._audio_config = {'speakers': None, 'microphones': None}
|
self._audio_config = {'speakers': None, 'microphones': None}
|
||||||
self._bluetooth_devices = {}
|
self._bluetooth_devices = {}
|
||||||
|
|
||||||
@ -160,8 +155,8 @@ class Computer:
|
|||||||
if params is None:
|
if params is None:
|
||||||
params = {}
|
params = {}
|
||||||
|
|
||||||
if action in const.ACTIONS:
|
if action in const.COMMANDS:
|
||||||
command_template = const.ACTIONS[action]
|
command_template = const.COMMANDS[action]
|
||||||
|
|
||||||
# Check if the command has the required parameters
|
# Check if the command has the required parameters
|
||||||
if "params" in command_template:
|
if "params" in command_template:
|
||||||
|
@ -1,9 +1,7 @@
|
|||||||
import re
|
|
||||||
|
|
||||||
from custom_components.easy_computer_manager.computer import Computer
|
from custom_components.easy_computer_manager.computer import Computer
|
||||||
|
|
||||||
|
|
||||||
async def format_debug_informations(computer: Computer):
|
async def get_debug_info(computer: Computer):
|
||||||
"""Return debug information about the host system."""
|
"""Return debug information about the host system."""
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
@ -31,53 +29,3 @@ async def format_debug_informations(computer: Computer):
|
|||||||
}
|
}
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
def parse_gnome_monitors_config(config: str):
|
|
||||||
"""Parse the GNOME monitors configuration."""
|
|
||||||
|
|
||||||
monitors = []
|
|
||||||
current_monitor = None
|
|
||||||
|
|
||||||
for line in config.split('\n'):
|
|
||||||
monitor_match = re.match(r'^Monitor \[ (.+?) \] (ON|OFF)$', line)
|
|
||||||
if monitor_match:
|
|
||||||
if current_monitor:
|
|
||||||
monitors.append(current_monitor)
|
|
||||||
source, status = monitor_match.groups()
|
|
||||||
current_monitor = {'source': source, 'status': status, 'names': [], 'resolutions': []}
|
|
||||||
elif current_monitor:
|
|
||||||
display_name_match = re.match(r'^\s+display-name: (.+)$', line)
|
|
||||||
resolution_match = re.match(r'^\s+(\d+x\d+@\d+(?:\.\d+)?).*$', line)
|
|
||||||
if display_name_match:
|
|
||||||
current_monitor['names'].append(display_name_match.group(1).replace('"', ''))
|
|
||||||
elif resolution_match:
|
|
||||||
# Don't include resolutions under 1280x720
|
|
||||||
if int(resolution_match.group(1).split('@')[0].split('x')[0]) >= 1280:
|
|
||||||
|
|
||||||
# If there are already resolutions in the list, check if the framerate between the last is >1
|
|
||||||
if len(current_monitor['resolutions']) > 0:
|
|
||||||
last_resolution = current_monitor['resolutions'][-1]
|
|
||||||
last_resolution_size = last_resolution.split('@')[0]
|
|
||||||
this_resolution_size = resolution_match.group(1).split('@')[0]
|
|
||||||
|
|
||||||
# Only truncate some framerates if the resolution are the same
|
|
||||||
if last_resolution_size == this_resolution_size:
|
|
||||||
last_resolution_framerate = float(last_resolution.split('@')[1])
|
|
||||||
this_resolution_framerate = float(resolution_match.group(1).split('@')[1])
|
|
||||||
|
|
||||||
# If the difference between the last resolution framerate and this one is >1, ignore it
|
|
||||||
if last_resolution_framerate - 1 > this_resolution_framerate:
|
|
||||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
|
||||||
else:
|
|
||||||
# If the resolution is different, this adds the new resolution
|
|
||||||
# to the list without truncating
|
|
||||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
|
||||||
else:
|
|
||||||
# This is the first resolution, add it to the list
|
|
||||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
|
||||||
|
|
||||||
if current_monitor:
|
|
||||||
monitors.append(current_monitor)
|
|
||||||
|
|
||||||
return monitors
|
|
||||||
|
@ -7,6 +7,7 @@ from typing import Any
|
|||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
from homeassistant import config_entries, exceptions
|
from homeassistant import config_entries, exceptions
|
||||||
from homeassistant.core import HomeAssistant
|
from homeassistant.core import HomeAssistant
|
||||||
|
from paramiko.ssh_exception import AuthenticationException
|
||||||
|
|
||||||
from .computer import Computer
|
from .computer import Computer
|
||||||
from .const import DOMAIN
|
from .const import DOMAIN
|
||||||
@ -53,7 +54,7 @@ class Hub:
|
|||||||
_LOGGER.info("Testing connection to %s", self._host)
|
_LOGGER.info("Testing connection to %s", self._host)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception:
|
except AuthenticationException:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
@ -85,7 +86,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
|||||||
try:
|
try:
|
||||||
info = await validate_input(self.hass, user_input)
|
info = await validate_input(self.hass, user_input)
|
||||||
return self.async_create_entry(title=info["title"], data=user_input)
|
return self.async_create_entry(title=info["title"], data=user_input)
|
||||||
except (CannotConnect, InvalidHost) as ex:
|
except (AuthenticationException, CannotConnect, InvalidHost) as ex:
|
||||||
errors["base"] = str(ex)
|
errors["base"] = str(ex)
|
||||||
except Exception as ex: # pylint: disable=broad-except
|
except Exception as ex: # pylint: disable=broad-except
|
||||||
_LOGGER.exception("Unexpected exception: %s", ex)
|
_LOGGER.exception("Unexpected exception: %s", ex)
|
||||||
|
@ -13,7 +13,7 @@ SERVICE_CHANGE_AUDIO_CONFIG = "change_audio_config"
|
|||||||
SERVICE_DEBUG_INFO = "debug_info"
|
SERVICE_DEBUG_INFO = "debug_info"
|
||||||
|
|
||||||
|
|
||||||
ACTIONS = {
|
COMMANDS = {
|
||||||
"operating_system": {
|
"operating_system": {
|
||||||
"linux": ["uname"]
|
"linux": ["uname"]
|
||||||
},
|
},
|
||||||
@ -40,18 +40,5 @@ ACTIONS = {
|
|||||||
"set_grub_entry": {
|
"set_grub_entry": {
|
||||||
"params": ["grub-entry"],
|
"params": ["grub-entry"],
|
||||||
"linux": ["sudo grub-reboot %grub-entry%", "sudo grub2-reboot %grub-entry%"]
|
"linux": ["sudo grub-reboot %grub-entry%", "sudo grub2-reboot %grub-entry%"]
|
||||||
},
|
|
||||||
"get_monitors_config": {
|
|
||||||
"linux": ["gnome-monitor-config list"]
|
|
||||||
},
|
|
||||||
"get_speakers": {
|
|
||||||
"linux": ["pactl list sinks"]
|
|
||||||
},
|
|
||||||
"get_microphones": {
|
|
||||||
"linux": ["pactl list sources"]
|
|
||||||
},
|
|
||||||
"get_bluetooth_devices": {
|
|
||||||
"exit": False,
|
|
||||||
"linux": ["bluetoothctl info"]
|
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -3,6 +3,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import logging
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
@ -30,6 +31,8 @@ from .const import SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_T
|
|||||||
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \
|
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \
|
||||||
SERVICE_CHANGE_MONITORS_CONFIG, SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO, DOMAIN
|
SERVICE_CHANGE_MONITORS_CONFIG, SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO, DOMAIN
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
async def async_setup_entry(
|
async def async_setup_entry(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
@ -207,7 +210,7 @@ class ComputerSwitch(SwitchEntity):
|
|||||||
|
|
||||||
async def debug_info(self) -> ServiceResponse:
|
async def debug_info(self) -> ServiceResponse:
|
||||||
"""Prints debug info."""
|
"""Prints debug info."""
|
||||||
return await computer_utils.format_debug_informations(self.computer)
|
return await computer_utils.get_debug_info(self.computer)
|
||||||
|
|
||||||
async def async_update(self) -> None:
|
async def async_update(self) -> None:
|
||||||
"""Ping the computer to see if it is online and update the state."""
|
"""Ping the computer to see if it is online and update the state."""
|
||||||
@ -222,6 +225,6 @@ class ComputerSwitch(SwitchEntity):
|
|||||||
"operating_system_version": self.computer.get_operating_system_version(),
|
"operating_system_version": self.computer.get_operating_system_version(),
|
||||||
"mac_address": self.computer.mac,
|
"mac_address": self.computer.mac,
|
||||||
"ip_address": self.computer.host,
|
"ip_address": self.computer.host,
|
||||||
"connected_devices": self.computer.get_bluetooth_devices(return_as_string=True),
|
"connected_devices": self.computer.get_bluetooth_devices(as_str=True),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
485
custom_components/easy_computer_manager/utils.py
Normal file
485
custom_components/easy_computer_manager/utils.py
Normal file
@ -0,0 +1,485 @@
|
|||||||
|
import logging
|
||||||
|
import re
|
||||||
|
|
||||||
|
import fabric2
|
||||||
|
from fabric2 import Connection
|
||||||
|
from homeassistant.exceptions import HomeAssistantError
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# _LOGGER.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
|
def is_unix_system(connection: Connection):
|
||||||
|
"""Return a boolean based on get_operating_system result."""
|
||||||
|
return get_operating_system(connection) == "Linux/Unix"
|
||||||
|
|
||||||
|
|
||||||
|
def get_operating_system_version(connection: Connection, is_unix=None):
|
||||||
|
"""Return the running operating system name and version."""
|
||||||
|
|
||||||
|
if is_unix is None:
|
||||||
|
is_unix = is_unix_system(connection)
|
||||||
|
|
||||||
|
if is_unix:
|
||||||
|
result = connection.run(
|
||||||
|
"awk -F'=' '/^NAME=|^VERSION=/{gsub(/\"/, \"\", $2); printf $2\" \"}\' /etc/os-release && echo").stdout
|
||||||
|
if result == "":
|
||||||
|
result = connection.run("lsb_release -a | awk '/Description/ {print $2, $3, $4}'").stdout
|
||||||
|
|
||||||
|
return result
|
||||||
|
else:
|
||||||
|
return connection.run(
|
||||||
|
'for /f "tokens=1 delims=|" %i in (\'wmic os get Name ^| findstr /B /C:"Microsoft"\') do @echo %i').stdout
|
||||||
|
|
||||||
|
|
||||||
|
def get_operating_system(connection: Connection):
|
||||||
|
"""Return the running operating system type."""
|
||||||
|
# TODO: might be a better way to do this
|
||||||
|
result = connection.run("uname")
|
||||||
|
if result.return_code == 0:
|
||||||
|
return "Linux/Unix"
|
||||||
|
else:
|
||||||
|
return "Windows/Other"
|
||||||
|
|
||||||
|
|
||||||
|
def shutdown_system(connection: Connection, is_unix=None):
|
||||||
|
"""Shutdown the system."""
|
||||||
|
|
||||||
|
if is_unix is None:
|
||||||
|
is_unix = is_unix_system(connection)
|
||||||
|
|
||||||
|
shutdown_commands = {
|
||||||
|
"unix": ["sudo shutdown -h now", "sudo init 0", "sudo systemctl poweroff"],
|
||||||
|
"windows": ["shutdown /s /t 0", "wmic os where Primary=TRUE call Shutdown"]
|
||||||
|
}
|
||||||
|
|
||||||
|
for command in shutdown_commands["unix" if is_unix else "windows"]:
|
||||||
|
result = connection.run(command)
|
||||||
|
if result.return_code == 0:
|
||||||
|
_LOGGER.debug("System shutting down on %s.", connection.host)
|
||||||
|
connection.close()
|
||||||
|
return
|
||||||
|
|
||||||
|
raise HomeAssistantError(f"Cannot shutdown system running at {connection.host}, all methods failed.")
|
||||||
|
|
||||||
|
|
||||||
|
def restart_system(connection: Connection, is_unix=None):
|
||||||
|
"""Restart the system."""
|
||||||
|
|
||||||
|
if is_unix is None:
|
||||||
|
is_unix = is_unix_system(connection)
|
||||||
|
|
||||||
|
restart_commands = {
|
||||||
|
"unix": ["sudo shutdown -r now", "sudo init 6", "sudo systemctl reboot"],
|
||||||
|
"windows": ["shutdown /r /t 0", "wmic os where Primary=TRUE call Reboot"]
|
||||||
|
}
|
||||||
|
|
||||||
|
for command in restart_commands["unix" if is_unix else "windows"]:
|
||||||
|
result = connection.run(command)
|
||||||
|
if result.return_code == 0:
|
||||||
|
_LOGGER.debug("System restarting on %s.", connection.host)
|
||||||
|
return
|
||||||
|
|
||||||
|
raise HomeAssistantError(f"Cannot restart system running at {connection.host}, all methods failed.")
|
||||||
|
|
||||||
|
|
||||||
|
def sleep_system(connection: Connection, is_unix=None):
|
||||||
|
"""Put the system to sleep."""
|
||||||
|
|
||||||
|
if is_unix is None:
|
||||||
|
is_unix = is_unix_system(connection)
|
||||||
|
|
||||||
|
sleep_commands = {
|
||||||
|
"unix": ["sudo systemctl suspend", "sudo pm-suspend"],
|
||||||
|
"windows": ["shutdown /h /t 0", "rundll32.exe powrprof.dll,SetSuspendState Sleep"]
|
||||||
|
}
|
||||||
|
|
||||||
|
for command in sleep_commands["unix" if is_unix else "windows"]:
|
||||||
|
result = connection.run(command)
|
||||||
|
if result.return_code == 0:
|
||||||
|
_LOGGER.debug("System sleeping on %s.", connection.host)
|
||||||
|
return
|
||||||
|
|
||||||
|
raise HomeAssistantError(f"Cannot put system running at {connection.host} to sleep, all methods failed.")
|
||||||
|
|
||||||
|
|
||||||
|
def get_windows_entry_in_grub(connection: Connection):
|
||||||
|
"""
|
||||||
|
Grabs the Windows entry name in GRUB.
|
||||||
|
Used later with grub-reboot to specify which entry to boot.
|
||||||
|
"""
|
||||||
|
commands = [
|
||||||
|
"sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub/grub.cfg",
|
||||||
|
"sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub2/grub.cfg"
|
||||||
|
]
|
||||||
|
|
||||||
|
for command in commands:
|
||||||
|
result = connection.run(command)
|
||||||
|
if result.return_code == 0 and result.stdout.strip():
|
||||||
|
_LOGGER.debug("Found Windows entry in GRUB: " + result.stdout.strip())
|
||||||
|
return result.stdout.strip()
|
||||||
|
|
||||||
|
_LOGGER.error("Could not find Windows entry in GRUB for system running at %s.", connection.host)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def restart_to_windows_from_linux(connection: Connection):
|
||||||
|
"""Restart a running Linux system to Windows."""
|
||||||
|
|
||||||
|
if not is_unix_system(connection):
|
||||||
|
raise HomeAssistantError(f"System running at {connection.host} is not a Linux system.")
|
||||||
|
|
||||||
|
windows_entry = get_windows_entry_in_grub(connection)
|
||||||
|
|
||||||
|
if windows_entry is not None:
|
||||||
|
reboot_commands = ["sudo grub-reboot", "sudo grub2-reboot"]
|
||||||
|
|
||||||
|
for reboot_command in reboot_commands:
|
||||||
|
result = connection.run(f"{reboot_command} \"{windows_entry}\"")
|
||||||
|
|
||||||
|
if result.return_code == 0:
|
||||||
|
_LOGGER.debug("Rebooting to Windows")
|
||||||
|
restart_system(connection)
|
||||||
|
return
|
||||||
|
|
||||||
|
raise HomeAssistantError(f"Failed to restart system running on {connection.host} to Windows from Linux.")
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError(f"Could not find Windows entry in grub for system running at {connection.host}.")
|
||||||
|
|
||||||
|
|
||||||
|
def change_monitors_config(connection: Connection, monitors_config: dict):
|
||||||
|
"""Change monitors configuration on the host (Linux + Gnome, and partial Windows support)."""
|
||||||
|
|
||||||
|
if is_unix_system(connection):
|
||||||
|
command_parts = ["gnome-monitor-config", "set"]
|
||||||
|
|
||||||
|
for monitor, settings in monitors_config.items():
|
||||||
|
if settings.get('enabled', False):
|
||||||
|
command_parts.extend(['-LpM' if settings.get('primary', False) else '-LM', monitor])
|
||||||
|
|
||||||
|
if 'position' in settings:
|
||||||
|
command_parts.extend(['-x', str(settings["position"][0]), '-y', str(settings["position"][1])])
|
||||||
|
|
||||||
|
if 'mode' in settings:
|
||||||
|
command_parts.extend(['-m', settings["mode"]])
|
||||||
|
|
||||||
|
if 'scale' in settings:
|
||||||
|
command_parts.extend(['-s', str(settings["scale"])])
|
||||||
|
|
||||||
|
if 'transform' in settings:
|
||||||
|
command_parts.extend(['-t', settings["transform"]])
|
||||||
|
|
||||||
|
command = ' '.join(command_parts)
|
||||||
|
_LOGGER.debug("Running command: %s", command)
|
||||||
|
|
||||||
|
result = connection.run(command)
|
||||||
|
|
||||||
|
if result.return_code == 0:
|
||||||
|
_LOGGER.info("Successfully changed monitors config on system running on %s.", connection.host)
|
||||||
|
|
||||||
|
# Run it once again, it fixes some strange Gnome display bug sometimes and it doesn't hurt
|
||||||
|
connection.run(command)
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("Could not change monitors config on system running on %s, check logs with debug",
|
||||||
|
connection.host)
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||||
|
|
||||||
|
# TODO: Implement Windows support using NIRCMD
|
||||||
|
command_parts = ["nircmd.exe", "setdisplay"]
|
||||||
|
# setdisplay {monitor:index/name} [width] [height] [color bits] {refresh rate} {-updatereg} {-allusers}
|
||||||
|
|
||||||
|
for monitor, settings in monitors_config.items():
|
||||||
|
if settings.get('enabled', False):
|
||||||
|
command_parts.extend(
|
||||||
|
[f'{monitor} -primary' if settings.get('primary', False) else f'{monitor} -secondary'])
|
||||||
|
|
||||||
|
if 'resolution' in settings:
|
||||||
|
command_parts.extend([str(settings["resolution"][0]), str(settings["resolution"][1])])
|
||||||
|
|
||||||
|
if 'refresh_rate' in settings:
|
||||||
|
command_parts.extend(['-hz', str(settings["refresh_rate"])])
|
||||||
|
|
||||||
|
if 'color_bits' in settings:
|
||||||
|
command_parts.extend(['-bits', str(settings["color_bits"])])
|
||||||
|
|
||||||
|
command = ' '.join(command_parts)
|
||||||
|
_LOGGER.debug("Running command: %s", command)
|
||||||
|
|
||||||
|
result = connection.run_action(command)
|
||||||
|
|
||||||
|
if result.return_code == 0:
|
||||||
|
_LOGGER.info("Successfully changed monitors config on system running on %s.", connection.host)
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("Could not change monitors config on system running on %s, check logs with debug",
|
||||||
|
connection.host)
|
||||||
|
|
||||||
|
|
||||||
|
def silent_install_nircmd(connection: Connection):
|
||||||
|
"""Silently install NIRCMD on a Windows system."""
|
||||||
|
|
||||||
|
if not is_unix_system(connection):
|
||||||
|
download_url = "https://www.nirsoft.net/utils/nircmd.zip"
|
||||||
|
install_path = f"C:\\Users\\{connection.user}\\AppData\\Local\\EasyComputerManager"
|
||||||
|
|
||||||
|
# Download and unzip NIRCMD
|
||||||
|
download_command = f"powershell -Command \"Invoke-WebRequest -Uri {download_url} -OutFile {install_path}\\nircmd.zip -UseBasicParsing\""
|
||||||
|
unzip_command = f"powershell -Command \"Expand-Archive {install_path}\\nircmd.zip -DestinationPath {install_path}\""
|
||||||
|
remove_zip_command = f"powershell -Command \"Remove-Item {install_path}\\nircmd.zip\""
|
||||||
|
|
||||||
|
commands = [download_command, unzip_command, remove_zip_command]
|
||||||
|
|
||||||
|
for command in commands:
|
||||||
|
result = connection.run(command)
|
||||||
|
if result.return_code != 0:
|
||||||
|
_LOGGER.error("Could not install NIRCMD on system running on %s.", connection.host)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def get_monitors_config(connection: Connection) -> dict:
|
||||||
|
"""Parse the output of the gnome-monitor-config command to get the current monitor configuration."""
|
||||||
|
|
||||||
|
if is_unix_system(connection):
|
||||||
|
result = connection.run("gnome-monitor-config list")
|
||||||
|
if result.return_code != 0:
|
||||||
|
raise HomeAssistantError(f"Could not get monitors config on system running at {connection.host}. "
|
||||||
|
f"Make sure gnome-monitor-config package is installed.")
|
||||||
|
|
||||||
|
monitors = []
|
||||||
|
current_monitor = None
|
||||||
|
|
||||||
|
for line in result.stdout.split('\n'):
|
||||||
|
monitor_match = re.match(r'^Monitor \[ (.+?) \] (ON|OFF)$', line)
|
||||||
|
if monitor_match:
|
||||||
|
if current_monitor:
|
||||||
|
monitors.append(current_monitor)
|
||||||
|
source, status = monitor_match.groups()
|
||||||
|
current_monitor = {'source': source, 'status': status, 'names': [], 'resolutions': []}
|
||||||
|
elif current_monitor:
|
||||||
|
display_name_match = re.match(r'^\s+display-name: (.+)$', line)
|
||||||
|
resolution_match = re.match(r'^\s+(\d+x\d+@\d+(?:\.\d+)?).*$', line)
|
||||||
|
if display_name_match:
|
||||||
|
current_monitor['names'].append(display_name_match.group(1).replace('"', ''))
|
||||||
|
elif resolution_match:
|
||||||
|
# Don't include resolutions under 1280x720
|
||||||
|
if int(resolution_match.group(1).split('@')[0].split('x')[0]) >= 1280:
|
||||||
|
|
||||||
|
# If there are already resolutions in the list, check if the framerate between the last is >1
|
||||||
|
if len(current_monitor['resolutions']) > 0:
|
||||||
|
last_resolution = current_monitor['resolutions'][-1]
|
||||||
|
last_resolution_size = last_resolution.split('@')[0]
|
||||||
|
this_resolution_size = resolution_match.group(1).split('@')[0]
|
||||||
|
|
||||||
|
# Only truncate some framerates if the resolution are the same
|
||||||
|
if last_resolution_size == this_resolution_size:
|
||||||
|
last_resolution_framerate = float(last_resolution.split('@')[1])
|
||||||
|
this_resolution_framerate = float(resolution_match.group(1).split('@')[1])
|
||||||
|
|
||||||
|
# If the difference between the last resolution framerate and this one is >1, ignore it
|
||||||
|
if last_resolution_framerate - 1 > this_resolution_framerate:
|
||||||
|
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||||
|
else:
|
||||||
|
# If the resolution is different, this adds the new resolution
|
||||||
|
# to the list without truncating
|
||||||
|
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||||
|
else:
|
||||||
|
# This is the first resolution, add it to the list
|
||||||
|
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||||
|
|
||||||
|
if current_monitor:
|
||||||
|
monitors.append(current_monitor)
|
||||||
|
|
||||||
|
return monitors
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||||
|
|
||||||
|
|
||||||
|
def steam_big_picture(connection: Connection, action: str):
|
||||||
|
"""Controls Steam in Big Picture mode on the host."""
|
||||||
|
|
||||||
|
_LOGGER.debug(f"Running Steam Big Picture action {action} on system running at {connection.host}.")
|
||||||
|
|
||||||
|
steam_commands = {
|
||||||
|
"start": {
|
||||||
|
"unix": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -bigpicture &",
|
||||||
|
"windows": "start steam://open/bigpicture"
|
||||||
|
},
|
||||||
|
"stop": {
|
||||||
|
"unix": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -shutdown &",
|
||||||
|
"windows": "C:\\Program Files (x86)\\Steam\\steam.exe -shutdown"
|
||||||
|
# TODO: check for different Steam install paths
|
||||||
|
},
|
||||||
|
"exit": {
|
||||||
|
"unix": None, # TODO: find a way to exit Steam Big Picture
|
||||||
|
"windows": "nircmd win close title \"Steam Big Picture Mode\""
|
||||||
|
# TODO: need to test (thx @MasterHidra https://www.reddit.com/r/Steam/comments/5c9l20/comment/k5fmb3k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
command = steam_commands.get(action)
|
||||||
|
|
||||||
|
if command is None:
|
||||||
|
raise HomeAssistantError(
|
||||||
|
f"Invalid action {action} for Steam Big Picture on system running at {connection.host}.")
|
||||||
|
|
||||||
|
if is_unix_system(connection):
|
||||||
|
result = connection.run(command.get("unix"))
|
||||||
|
else:
|
||||||
|
result = connection.run(command.get("windows"))
|
||||||
|
|
||||||
|
if result.return_code != 0:
|
||||||
|
raise HomeAssistantError(f"Could not {action} Steam Big Picture on system running at {connection.host}.")
|
||||||
|
|
||||||
|
|
||||||
|
def get_audio_config(connection: Connection):
|
||||||
|
if is_unix_system(connection):
|
||||||
|
config = {'sinks': [], 'sources': []}
|
||||||
|
|
||||||
|
def parse_device_info(lines, device_type):
|
||||||
|
devices = []
|
||||||
|
current_device = {}
|
||||||
|
|
||||||
|
for line in lines:
|
||||||
|
if line.startswith(f"{device_type} #"):
|
||||||
|
if current_device and "Monitor" not in current_device['description']:
|
||||||
|
devices.append(current_device)
|
||||||
|
current_device = {'id': int(re.search(r'#(\d+)', line).group(1))}
|
||||||
|
elif line.startswith(" Name:"):
|
||||||
|
current_device['name'] = line.split(":")[1].strip()
|
||||||
|
elif line.startswith(" State:"):
|
||||||
|
current_device['state'] = line.split(":")[1].strip()
|
||||||
|
elif line.startswith(" Description:"):
|
||||||
|
current_device['description'] = line.split(":")[1].strip()
|
||||||
|
|
||||||
|
if current_device:
|
||||||
|
devices.append(current_device)
|
||||||
|
|
||||||
|
return devices
|
||||||
|
|
||||||
|
# Get sinks
|
||||||
|
result = connection.run("LANG=en_US.UTF-8 pactl list sinks")
|
||||||
|
if result.return_code != 0:
|
||||||
|
raise HomeAssistantError(f"Could not get audio sinks on system running at {connection.host}. "
|
||||||
|
f"Make sure pactl package is installed!")
|
||||||
|
config['sinks'] = parse_device_info(result.stdout.split('\n'), 'Sink')
|
||||||
|
|
||||||
|
# Get sources
|
||||||
|
result = connection.run("LANG=en_US.UTF-8 pactl list sources")
|
||||||
|
if result.return_code != 0:
|
||||||
|
raise HomeAssistantError(f"Could not get audio sources on system running at {connection.host}."
|
||||||
|
f"Make sure pactl package is installed!")
|
||||||
|
config['sources'] = parse_device_info(result.stdout.split('\n'), 'Source')
|
||||||
|
|
||||||
|
return config
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||||
|
|
||||||
|
|
||||||
|
def change_audio_config(connection: Connection, volume: int, mute: bool, input_device: str = "@DEFAULT_SOURCE@",
|
||||||
|
output_device: str = "@DEFAULT_SINK@"):
|
||||||
|
"""Change audio configuration on the host system."""
|
||||||
|
|
||||||
|
if is_unix_system(connection):
|
||||||
|
current_config = get_audio_config(connection)
|
||||||
|
executable = "pactl"
|
||||||
|
commands = []
|
||||||
|
|
||||||
|
def get_device_id(device_type, user_device):
|
||||||
|
for device in current_config[device_type]:
|
||||||
|
if device['description'] == user_device:
|
||||||
|
return device['name']
|
||||||
|
return user_device
|
||||||
|
|
||||||
|
# Set default sink and source if not specified
|
||||||
|
if not output_device:
|
||||||
|
output_device = "@DEFAULT_SINK@"
|
||||||
|
if not input_device:
|
||||||
|
input_device = "@DEFAULT_SOURCE@"
|
||||||
|
|
||||||
|
# Set default sink if specified
|
||||||
|
if output_device and output_device != "@DEFAULT_SINK@":
|
||||||
|
output_device = get_device_id('sinks', output_device)
|
||||||
|
commands.append(f"{executable} set-default-sink {output_device}")
|
||||||
|
|
||||||
|
# Set default source if specified
|
||||||
|
if input_device and input_device != "@DEFAULT_SOURCE@":
|
||||||
|
input_device = get_device_id('sources', input_device)
|
||||||
|
commands.append(f"{executable} set-default-source {input_device}")
|
||||||
|
|
||||||
|
# Set sink volume if specified
|
||||||
|
if volume is not None:
|
||||||
|
commands.append(f"{executable} set-sink-volume {output_device} {volume}%")
|
||||||
|
|
||||||
|
# Set sink and source mute status if specified
|
||||||
|
if mute is not None:
|
||||||
|
commands.append(f"{executable} set-sink-mute {output_device} {'yes' if mute else 'no'}")
|
||||||
|
commands.append(f"{executable} set-source-mute {input_device} {'yes' if mute else 'no'}")
|
||||||
|
|
||||||
|
# Execute commands
|
||||||
|
for command in commands:
|
||||||
|
_LOGGER.debug("Running command: %s", command)
|
||||||
|
result = connection.run(command)
|
||||||
|
|
||||||
|
if result.return_code != 0:
|
||||||
|
raise HomeAssistantError(
|
||||||
|
f"Could not change audio config on system running on {connection.host}, check logs with debug and"
|
||||||
|
f"make sure pactl package is installed!")
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||||
|
|
||||||
|
|
||||||
|
def get_bluetooth_devices(connection: Connection, only_connected: bool = False, return_as_string: bool = False) -> []:
|
||||||
|
commands = {
|
||||||
|
"unix": "bluetoothctl info",
|
||||||
|
"windows": "",
|
||||||
|
}
|
||||||
|
|
||||||
|
if is_unix_system(connection):
|
||||||
|
result = connection.run(commands["unix"])
|
||||||
|
if result.return_code != 0:
|
||||||
|
if result.stdout.__contains__("Missing device address argument"): # Means no devices are connected
|
||||||
|
return []
|
||||||
|
else:
|
||||||
|
_LOGGER.warning(f"Cannot retrieve bluetooth devices at {connection.host}. "
|
||||||
|
f"Make sure bluetoothctl is installed!")
|
||||||
|
return []
|
||||||
|
|
||||||
|
devices = []
|
||||||
|
current_device = None
|
||||||
|
|
||||||
|
for line in result.stdout.split('\n'):
|
||||||
|
if line.startswith('Device'):
|
||||||
|
if current_device is not None:
|
||||||
|
devices.append({
|
||||||
|
"address": current_device,
|
||||||
|
"name": current_name,
|
||||||
|
"connected": current_connected
|
||||||
|
})
|
||||||
|
current_device = line.split()[1]
|
||||||
|
current_name = None
|
||||||
|
current_connected = None
|
||||||
|
elif 'Name:' in line:
|
||||||
|
current_name = line.split(': ', 1)[1]
|
||||||
|
elif 'Connected:' in line:
|
||||||
|
current_connected = line.split(': ')[1] == 'yes'
|
||||||
|
|
||||||
|
# Add the last device if any
|
||||||
|
if current_device is not None:
|
||||||
|
devices.append({
|
||||||
|
"address": current_device,
|
||||||
|
"name": current_name,
|
||||||
|
"connected": current_connected
|
||||||
|
})
|
||||||
|
|
||||||
|
if only_connected:
|
||||||
|
devices = [device for device in devices if device["connected"] == "yes"]
|
||||||
|
|
||||||
|
if return_as_string:
|
||||||
|
devices = "; ".join([f"{device['name']} ({device['address']})" for device in devices])
|
||||||
|
|
||||||
|
return devices
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
Loading…
Reference in New Issue
Block a user