Compare commits
21 Commits
Author | SHA1 | Date | |
---|---|---|---|
f82826fc0a | |||
67df62ff91 | |||
59ff04775c | |||
08d9f78a35 | |||
591396895d | |||
daf2d6d63e | |||
94a65178c0 | |||
b4b0bead89 | |||
723b7b1284 | |||
eaf434efc8 | |||
767d877442 | |||
3e13d1ddb7 | |||
8f2f03f134 | |||
af870c63eb | |||
3a3775f457 | |||
3fc5fb948f | |||
bc16d06ac6 | |||
e67e29facc | |||
7e6736c8b3 | |||
0f72986d2a | |||
ab2c8e0376 |
22
.github/wiki/script-auto-config-linux.sh
vendored
22
.github/wiki/script-auto-config-linux.sh
vendored
@ -32,7 +32,7 @@ fi
|
||||
# Configure sudoers
|
||||
print_colored "$COLOR_BLUE" "Configuring sudoers..."
|
||||
echo -e "\n# Allow your user to execute specific commands without a password (for EasyComputerManager/HA)" | sudo tee -a /etc/sudoers > /dev/null
|
||||
echo "$USER_BEHIND_SUDO ALL=(ALL) NOPASSWD: /sbin/shutdown, /sbin/init, /usr/bin/systemctl, /usr/sbin/pm-suspend, /usr/bin/awk, /usr/sbin/grub-reboot, /usr/sbin/grub2-reboot" | sudo tee -a /etc/sudoers > /dev/null
|
||||
echo "$USER_BEHIND_SUDO ALL=(ALL) NOPASSWD: /sbin/shutdown, /sbin/init, /usr/sbin/pm-suspend, /usr/sbin/grub-reboot, /usr/sbin/grub2-reboot, /usr/bin/cat /etc/grub2.cfg, /usr/bin/cat /etc/grub.cfg, /usr/bin/systemctl poweroff, /usr/bin/systemctl reboot, /usr/bin/systemctl suspend" | sudo tee -a /etc/sudoers > /dev/null
|
||||
print_colored "$COLOR_GREEN" "Sudoers file configured successfully."
|
||||
|
||||
# Firewall Configuration
|
||||
@ -51,16 +51,16 @@ DESKTOP_ENTRY_NAME="EasyComputerManager-AutoStart"
|
||||
DESKTOP_ENTRY_PATH="/home/$USER_BEHIND_SUDO/.config/autostart/$DESKTOP_ENTRY_NAME.desktop"
|
||||
|
||||
# Create the desktop entry file for the Desktop Environment to autostart at login every reboot
|
||||
# cat > "$DESKTOP_ENTRY_PATH" <<EOF
|
||||
# [Desktop Entry]
|
||||
# Type=Application
|
||||
# Name=$DESKTOP_ENTRY_NAME
|
||||
# Exec=sh -c '$COMMANDS'
|
||||
# Hidden=false
|
||||
# NoDisplay=false
|
||||
# X-GNOME-Autostart-enabled=true
|
||||
# EOF
|
||||
# chmod +x "$DESKTOP_ENTRY_PATH"
|
||||
cat > "$DESKTOP_ENTRY_PATH" <<EOF
|
||||
[Desktop Entry]
|
||||
Type=Application
|
||||
Name=$DESKTOP_ENTRY_NAME
|
||||
Exec=sh -c '$COMMANDS'
|
||||
Hidden=false
|
||||
NoDisplay=false
|
||||
X-GNOME-Autostart-enabled=true
|
||||
EOF
|
||||
chmod +x "$DESKTOP_ENTRY_PATH"
|
||||
print_colored "$COLOR_GREEN" "Desktop entry created at $DESKTOP_ENTRY_PATH."
|
||||
|
||||
print_colored "$COLOR_GREEN" "\nDone! Some features may require a reboot to work including:"
|
||||
|
24
README.md
24
README.md
@ -28,7 +28,11 @@ if dual-boot), adjusting audio configurations, changing monitor settings, and mo
|
||||
1. Install [HACS](https://hacs.xyz/) if not already installed.
|
||||
2. In Home Assistant, go to "HACS" in the sidebar.
|
||||
3. Click on "Integrations."
|
||||
4. Search for "easy_computer_manager" and click "Install."
|
||||
4. Click on the three dots in the top right corner and select "Custom repositories."
|
||||
5. Paste the following URL in the "Repo" field: https://github.com/M4TH1EU/HA-EasyComputerManager
|
||||
6. Select "Integration" from the "Category" dropdown.
|
||||
7. Click "Add."
|
||||
8. Search for "easy_computer_manager" and click "Install."
|
||||
|
||||
### Manually
|
||||
|
||||
@ -37,22 +41,34 @@ if dual-boot), adjusting audio configurations, changing monitor settings, and mo
|
||||
3. Copy the "custom_components/easy_computer_manager" directory to the "config/custom_components/" directory in your
|
||||
Home Assistant instance.
|
||||
|
||||
## Configuration
|
||||
# Preparing your computer (required)
|
||||
> [!CAUTION]
|
||||
> Before adding your computer to Home Assistant, ensure that it is properly configured.
|
||||
|
||||
**See wiki page [here](https://github.com/M4TH1EU/HA-EasyComputerManager/wiki/Prepare-your-computer).**
|
||||
|
||||
## Add Computer to Home Assistant
|
||||
|
||||
1. In Home Assistant, go to "Configuration" in the sidebar.
|
||||
2. Select "Integrations."
|
||||
3. Click the "+" button to add a new integration.
|
||||
4. Search for "Easy Computer Manager" and select it from the list.
|
||||
5. Follow the on-screen instructions to configure the integration, providing details such as the IP address, username,
|
||||
and password for the computer you want to manage.
|
||||
5. Follow the on-screen instructions to configure the integration, providing details such as the IP address, mac-adress, username,
|
||||
and password for the computer you want to add.
|
||||
6. Once configured, click "Finish" to add the computer to Home Assistant.
|
||||
|
||||
> [!NOTE]
|
||||
> If you are managing a dual-boot computer, ensure that the "Dual boot system" checkbox is enabled during the configuration.
|
||||
|
||||
## Usage
|
||||
|
||||
After adding your computer to Home Assistant, you can use the provided services to manage it remotely. Explore the
|
||||
available services in the Home Assistant "Services" tab or use automations to integrate Easy Computer Manager into your
|
||||
smart home setup.
|
||||
|
||||
## Services
|
||||
A detailed list of available services and their parameters can be found in the wiki [here](https://github.com/M4TH1EU/HA-EasyComputerManager/wiki/Services).
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
If you encounter any issues during installation or configuration, refer to the troubleshooting section in
|
||||
|
@ -16,7 +16,7 @@ from homeassistant.core import HomeAssistant, ServiceCall
|
||||
|
||||
from .const import DOMAIN, SERVICE_SEND_MAGIC_PACKET, SERVICE_CHANGE_MONITORS_CONFIG
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA = vol.Schema({
|
||||
vol.Required(CONF_MAC): cv.string,
|
||||
@ -40,7 +40,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
if broadcast_port is not None:
|
||||
service_kwargs["port"] = broadcast_port
|
||||
|
||||
_LOGGER.info(
|
||||
LOGGER.info(
|
||||
"Sending magic packet to MAC %s (broadcast: %s, port: %s)",
|
||||
mac_address,
|
||||
broadcast_address,
|
||||
@ -59,11 +59,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
schema=WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA,
|
||||
)
|
||||
|
||||
hass.async_create_task(
|
||||
hass.config_entries.async_forward_entry_setup(
|
||||
entry, "switch"
|
||||
)
|
||||
)
|
||||
await hass.config_entries.async_forward_entry_setups(entry, ["switch"])
|
||||
|
||||
return True
|
||||
|
||||
|
208
custom_components/easy_computer_manager/computer/__init__.py
Normal file
208
custom_components/easy_computer_manager/computer/__init__.py
Normal file
@ -0,0 +1,208 @@
|
||||
import asyncio
|
||||
from typing import Optional, Dict, Any
|
||||
|
||||
from wakeonlan import send_magic_packet
|
||||
|
||||
from custom_components.easy_computer_manager import const, LOGGER
|
||||
from custom_components.easy_computer_manager.computer.common import OSType, CommandOutput
|
||||
from custom_components.easy_computer_manager.computer.formatter import format_gnome_monitors_args, format_pactl_commands
|
||||
from custom_components.easy_computer_manager.computer.parser import parse_gnome_monitors_output, parse_pactl_output, \
|
||||
parse_bluetoothctl
|
||||
from custom_components.easy_computer_manager.computer.ssh_client import SSHClient
|
||||
|
||||
|
||||
class Computer:
|
||||
def __init__(self, host: str, mac: str, username: str, password: str, port: int = 22,
|
||||
dualboot: bool = False) -> None:
|
||||
"""Initialize the Computer object."""
|
||||
self.initialized = False # used to avoid duplicated ssh connections
|
||||
|
||||
self.host = host
|
||||
self.mac = mac
|
||||
self.username = username
|
||||
self._password = password
|
||||
self.port = port
|
||||
self.dualboot = dualboot
|
||||
|
||||
self.operating_system: Optional[OSType] = None
|
||||
self.operating_system_version: Optional[str] = None
|
||||
self.desktop_environment: Optional[str] = None
|
||||
self.windows_entry_grub: Optional[str] = None
|
||||
self.monitors_config: Optional[Dict[str, Any]] = None
|
||||
self.audio_config: Dict[str, Optional[Dict]] = {}
|
||||
self.bluetooth_devices: Dict[str, Any] = {}
|
||||
|
||||
self._connection: SSHClient = SSHClient(host, username, password, port)
|
||||
asyncio.create_task(self._connection.connect(computer=self))
|
||||
|
||||
async def update(self, state: Optional[bool] = True, timeout: Optional[int] = 2) -> None:
|
||||
"""Update computer details."""
|
||||
if not state or not await self.is_on():
|
||||
LOGGER.debug("Computer is off, skipping update")
|
||||
return
|
||||
|
||||
# Ensure connection is established before updating
|
||||
await self._ensure_connection_alive(timeout)
|
||||
|
||||
# Update tasks
|
||||
await asyncio.gather(
|
||||
self._update_operating_system(),
|
||||
self._update_operating_system_version(),
|
||||
self._update_desktop_environment(),
|
||||
self._update_windows_entry_grub(),
|
||||
self._update_monitors_config(),
|
||||
self._update_audio_config(),
|
||||
self._update_bluetooth_devices()
|
||||
)
|
||||
|
||||
async def _ensure_connection_alive(self, timeout: int) -> None:
|
||||
"""Ensure the SSH connection is alive, reconnect if necessary."""
|
||||
for _ in range(timeout * 4):
|
||||
if self._connection.is_connection_alive():
|
||||
return
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
if not self._connection.is_connection_alive():
|
||||
LOGGER.debug(f"Reconnecting to {self.host}")
|
||||
await self._connection.connect()
|
||||
if not self._connection.is_connection_alive():
|
||||
LOGGER.debug(f"Failed to connect to {self.host} after timeout={timeout}s")
|
||||
raise ConnectionError("SSH connection could not be re-established")
|
||||
|
||||
async def _update_operating_system(self) -> None:
|
||||
"""Update the operating system information."""
|
||||
self.operating_system = await self._detect_operating_system()
|
||||
|
||||
async def _update_operating_system_version(self) -> None:
|
||||
"""Update the operating system version."""
|
||||
self.operating_system_version = (await self.run_action("operating_system_version")).output
|
||||
|
||||
async def _update_desktop_environment(self) -> None:
|
||||
"""Update the desktop environment information."""
|
||||
self.desktop_environment = (await self.run_action("desktop_environment")).output.lower()
|
||||
|
||||
async def _update_windows_entry_grub(self) -> None:
|
||||
"""Update Windows entry in GRUB (if applicable)."""
|
||||
self.windows_entry_grub = (await self.run_action("get_windows_entry_grub")).output
|
||||
|
||||
async def _update_monitors_config(self) -> None:
|
||||
"""Update monitors configuration."""
|
||||
if self.operating_system == OSType.LINUX:
|
||||
output = (await self.run_action("get_monitors_config")).output
|
||||
self.monitors_config = parse_gnome_monitors_output(output)
|
||||
# TODO: Implement for Windows if needed
|
||||
|
||||
async def _update_audio_config(self) -> None:
|
||||
"""Update audio configuration."""
|
||||
speakers_output = (await self.run_action("get_speakers")).output
|
||||
microphones_output = (await self.run_action("get_microphones")).output
|
||||
|
||||
if self.operating_system == OSType.LINUX:
|
||||
self.audio_config = parse_pactl_output(speakers_output, microphones_output)
|
||||
# TODO: Implement for Windows
|
||||
|
||||
async def _update_bluetooth_devices(self) -> None:
|
||||
"""Update Bluetooth devices list."""
|
||||
if self.operating_system == OSType.LINUX:
|
||||
self.bluetooth_devices = parse_bluetoothctl(await self.run_action("get_bluetooth_devices"))
|
||||
# TODO: Implement for Windows
|
||||
|
||||
async def _detect_operating_system(self) -> OSType:
|
||||
"""Detect the operating system by running a uname command."""
|
||||
result = await self.run_manually("uname")
|
||||
return OSType.LINUX if result.successful() else OSType.WINDOWS
|
||||
|
||||
async def is_on(self, timeout: int = 1) -> bool:
|
||||
"""Check if the computer is on by pinging it."""
|
||||
ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self.host)]
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*ping_cmd,
|
||||
stdout=asyncio.subprocess.DEVNULL,
|
||||
stderr=asyncio.subprocess.DEVNULL
|
||||
)
|
||||
await proc.communicate()
|
||||
return proc.returncode == 0
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the computer using Wake-on-LAN."""
|
||||
send_magic_packet(self.mac)
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
"""Shutdown the computer."""
|
||||
await self.run_action("shutdown")
|
||||
|
||||
async def restart(self, from_os: Optional[OSType] = None, to_os: Optional[OSType] = None) -> None:
|
||||
"""Restart the computer."""
|
||||
await self.run_action("restart")
|
||||
|
||||
async def put_to_sleep(self) -> None:
|
||||
"""Put the computer to sleep."""
|
||||
await self.run_action("sleep")
|
||||
|
||||
async def set_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
|
||||
"""Set monitors configuration."""
|
||||
if self.is_linux() and self.desktop_environment == 'gnome':
|
||||
await self.run_action("set_monitors_config", params={"args": format_gnome_monitors_args(monitors_config)})
|
||||
|
||||
async def set_audio_config(self, volume: Optional[int] = None, mute: Optional[bool] = None,
|
||||
input_device: Optional[str] = None, output_device: Optional[str] = None) -> None:
|
||||
"""Set audio configuration."""
|
||||
if self.is_linux() and self.desktop_environment == 'gnome':
|
||||
pactl_commands = format_pactl_commands(self.audio_config, volume, mute, input_device, output_device)
|
||||
for command in pactl_commands:
|
||||
await self.run_action("set_audio_config", params={"args": command})
|
||||
|
||||
async def install_nircmd(self) -> None:
|
||||
"""Install NirCmd tool (Windows specific)."""
|
||||
install_path = f"C:\\Users\\{self.username}\\AppData\\Local\\EasyComputerManager"
|
||||
await self.run_action("install_nircmd", params={
|
||||
"download_url": "https://www.nirsoft.net/utils/nircmd.zip",
|
||||
"install_path": install_path
|
||||
})
|
||||
|
||||
async def steam_big_picture(self, action: str) -> None:
|
||||
"""Start, stop, or exit Steam Big Picture mode."""
|
||||
await self.run_action(f"{action}_steam_big_picture")
|
||||
|
||||
async def run_action(self, id: str, params: Optional[Dict[str, Any]] = None,
|
||||
raise_on_error: bool = False) -> CommandOutput:
|
||||
"""Run a predefined action via SSH."""
|
||||
params = params or {}
|
||||
|
||||
action = const.ACTIONS.get(id)
|
||||
if not action:
|
||||
LOGGER.error(f"Action {id} not found.")
|
||||
return CommandOutput("", 1, "", "Action not found")
|
||||
|
||||
if not self.operating_system:
|
||||
self.operating_system = await self._detect_operating_system()
|
||||
|
||||
os_commands = action.get(self.operating_system.lower())
|
||||
if not os_commands:
|
||||
raise ValueError(f"Action {id} not supported for OS: {self.operating_system}")
|
||||
|
||||
commands = os_commands if isinstance(os_commands, list) else os_commands.get("commands",
|
||||
[os_commands.get("command")])
|
||||
required_params = []
|
||||
if "params" in os_commands:
|
||||
required_params = os_commands.get("params", [])
|
||||
|
||||
# Validate parameters
|
||||
if sorted(required_params) != sorted(params.keys()):
|
||||
raise ValueError(f"Invalid/missing parameters for action: {id}")
|
||||
|
||||
for command in commands:
|
||||
for param, value in params.items():
|
||||
command = command.replace(f"%{param}%", str(value))
|
||||
|
||||
result = await self.run_manually(command)
|
||||
if result.successful():
|
||||
return result
|
||||
elif raise_on_error:
|
||||
raise ValueError(f"Command failed: {command}")
|
||||
|
||||
return result
|
||||
|
||||
async def run_manually(self, command: str) -> CommandOutput:
|
||||
"""Run a custom command manually via SSH."""
|
||||
return await self._connection.execute_command(command)
|
18
custom_components/easy_computer_manager/computer/common.py
Normal file
18
custom_components/easy_computer_manager/computer/common.py
Normal file
@ -0,0 +1,18 @@
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class OSType(str, Enum):
|
||||
WINDOWS = "Windows"
|
||||
LINUX = "Linux"
|
||||
MACOS = "MacOS"
|
||||
|
||||
|
||||
class CommandOutput:
|
||||
def __init__(self, command: str, return_code: int, output: str, error: str) -> None:
|
||||
self.command = command
|
||||
self.return_code = return_code
|
||||
self.output = output.strip()
|
||||
self.error = error.strip()
|
||||
|
||||
def successful(self) -> bool:
|
||||
return self.return_code == 0
|
@ -0,0 +1,62 @@
|
||||
def format_gnome_monitors_args(monitors_config: dict):
|
||||
args = []
|
||||
|
||||
monitors_config = monitors_config.get('monitors_config', {})
|
||||
|
||||
for monitor, settings in monitors_config.items():
|
||||
if settings.get('enabled', False):
|
||||
args.extend(['-LpM' if settings.get('primary', False) else '-LM', monitor])
|
||||
|
||||
if 'position' in settings:
|
||||
args.extend(['-x', str(settings["position"][0]), '-y', str(settings["position"][1])])
|
||||
|
||||
if 'mode' in settings:
|
||||
args.extend(['-m', settings["mode"]])
|
||||
|
||||
if 'scale' in settings:
|
||||
args.extend(['-s', str(settings["scale"])])
|
||||
|
||||
if 'transform' in settings:
|
||||
args.extend(['-t', settings["transform"]])
|
||||
|
||||
return ' '.join(args)
|
||||
|
||||
|
||||
def format_pactl_commands(current_config: {}, volume: int, mute: bool, input_device: str = "@DEFAULT_SOURCE@",
|
||||
output_device: str = "@DEFAULT_SINK@"):
|
||||
"""Change audio configuration on the host system."""
|
||||
|
||||
commands = []
|
||||
|
||||
def get_device_id(device_type, user_device):
|
||||
for device in current_config[device_type]:
|
||||
if device['description'] == user_device:
|
||||
return device['name']
|
||||
return user_device
|
||||
|
||||
# Set default sink and source if not specified
|
||||
if not output_device:
|
||||
output_device = "@DEFAULT_SINK@"
|
||||
if not input_device:
|
||||
input_device = "@DEFAULT_SOURCE@"
|
||||
|
||||
# Set default sink if specified
|
||||
if output_device and output_device != "@DEFAULT_SINK@":
|
||||
output_device = get_device_id('sinks', output_device)
|
||||
commands.append(f"set-default-sink {output_device}")
|
||||
|
||||
# Set default source if specified
|
||||
if input_device and input_device != "@DEFAULT_SOURCE@":
|
||||
input_device = get_device_id('sources', input_device)
|
||||
commands.append(f"set-default-source {input_device}")
|
||||
|
||||
# Set sink volume if specified
|
||||
if volume is not None:
|
||||
commands.append(f"set-sink-volume {output_device} {volume}%")
|
||||
|
||||
# Set sink and source mute status if specified
|
||||
if mute is not None:
|
||||
commands.append(f"set-sink-mute {output_device} {'yes' if mute else 'no'}")
|
||||
commands.append(f"set-source-mute {input_device} {'yes' if mute else 'no'}")
|
||||
|
||||
return commands
|
168
custom_components/easy_computer_manager/computer/parser.py
Normal file
168
custom_components/easy_computer_manager/computer/parser.py
Normal file
@ -0,0 +1,168 @@
|
||||
import re
|
||||
|
||||
from custom_components.easy_computer_manager import LOGGER
|
||||
from custom_components.easy_computer_manager.computer import CommandOutput
|
||||
|
||||
|
||||
def parse_gnome_monitors_output(config: str) -> list:
|
||||
"""
|
||||
Parse the GNOME monitors configuration.
|
||||
|
||||
:param config:
|
||||
The output of the gnome-monitor-config list command.
|
||||
|
||||
:type config: str
|
||||
|
||||
:returns: list
|
||||
The parsed monitors configuration.
|
||||
"""
|
||||
|
||||
monitors = []
|
||||
current_monitor = None
|
||||
|
||||
for line in config.split('\n'):
|
||||
monitor_match = re.match(r'^Monitor \[ (.+?) \] (ON|OFF)$', line)
|
||||
if monitor_match:
|
||||
if current_monitor:
|
||||
monitors.append(current_monitor)
|
||||
source, status = monitor_match.groups()
|
||||
current_monitor = {'source': source, 'status': status, 'names': [], 'resolutions': []}
|
||||
elif current_monitor:
|
||||
display_name_match = re.match(r'^\s+display-name: (.+)$', line)
|
||||
resolution_match = re.match(r'^\s+(\d+x\d+@\d+(?:\.\d+)?).*$', line)
|
||||
if display_name_match:
|
||||
current_monitor['names'].append(display_name_match.group(1).replace('"', ''))
|
||||
elif resolution_match:
|
||||
# Don't include resolutions under 1280x720
|
||||
if int(resolution_match.group(1).split('@')[0].split('x')[0]) >= 1280:
|
||||
|
||||
# If there are already resolutions in the list, check if the framerate between the last is >1
|
||||
if len(current_monitor['resolutions']) > 0:
|
||||
last_resolution = current_monitor['resolutions'][-1]
|
||||
last_resolution_size = last_resolution.split('@')[0]
|
||||
this_resolution_size = resolution_match.group(1).split('@')[0]
|
||||
|
||||
# Only truncate some framerates if the resolution are the same
|
||||
if last_resolution_size == this_resolution_size:
|
||||
last_resolution_framerate = float(last_resolution.split('@')[1])
|
||||
this_resolution_framerate = float(resolution_match.group(1).split('@')[1])
|
||||
|
||||
# If the difference between the last resolution framerate and this one is >1, ignore it
|
||||
if last_resolution_framerate - 1 > this_resolution_framerate:
|
||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||
else:
|
||||
# If the resolution is different, this adds the new resolution
|
||||
# to the list without truncating
|
||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||
else:
|
||||
# This is the first resolution, add it to the list
|
||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||
|
||||
if current_monitor:
|
||||
monitors.append(current_monitor)
|
||||
|
||||
return monitors
|
||||
|
||||
|
||||
def parse_pactl_output(config_speakers: str, config_microphones: str) -> dict[str, list]:
|
||||
"""
|
||||
Parse the pactl audio configuration.
|
||||
|
||||
:param config_speakers:
|
||||
The output of the pactl list sinks command.
|
||||
:param config_microphones:
|
||||
The output of the pactl list sources command.
|
||||
|
||||
:type config_speakers: str
|
||||
:type config_microphones: str
|
||||
|
||||
:returns: dict
|
||||
The parsed audio configuration.
|
||||
|
||||
"""
|
||||
|
||||
config = {'speakers': [], 'microphones': []}
|
||||
|
||||
def parse_device_info(lines, device_type):
|
||||
devices = []
|
||||
current_device = {}
|
||||
|
||||
for line in lines:
|
||||
if line.startswith(f"{device_type} #"):
|
||||
if current_device and "Monitor" not in current_device['description']:
|
||||
devices.append(current_device)
|
||||
current_device = {'id': int(re.search(r'#(\d+)', line).group(1))}
|
||||
elif line.startswith(" Name:"):
|
||||
current_device['name'] = line.split(":")[1].strip()
|
||||
elif line.startswith(" State:"):
|
||||
current_device['state'] = line.split(":")[1].strip()
|
||||
elif line.startswith(" Description:"):
|
||||
current_device['description'] = line.split(":")[1].strip()
|
||||
|
||||
if current_device:
|
||||
devices.append(current_device)
|
||||
|
||||
return devices
|
||||
|
||||
config['speakers'] = parse_device_info(config_speakers.split('\n'), 'Sink')
|
||||
config['microphones'] = parse_device_info(config_microphones.split('\n'), 'Source')
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def parse_bluetoothctl(command: CommandOutput, connected_devices_only: bool = True,
|
||||
return_as_string: bool = False) -> list | str:
|
||||
"""Parse the bluetoothctl info command.
|
||||
|
||||
:param command:
|
||||
The command output.
|
||||
:param connected_devices_only:
|
||||
Only return connected devices.
|
||||
Will return all devices, connected or not, if False.
|
||||
|
||||
:type command: :class: CommandOutput
|
||||
:type connected_devices_only: bool
|
||||
|
||||
:returns: str | list
|
||||
The parsed bluetooth devices.
|
||||
|
||||
"""
|
||||
|
||||
if not command.successful():
|
||||
if command.output.__contains__("Missing device address argument"): # Means no devices are connected
|
||||
return "" if return_as_string else []
|
||||
else:
|
||||
LOGGER.warning(f"Cannot retrieve bluetooth devices, make sure bluetoothctl is installed")
|
||||
return "" if return_as_string else []
|
||||
|
||||
devices = []
|
||||
current_device = None
|
||||
|
||||
for line in command.output.split('\n'):
|
||||
if line.startswith('Device'):
|
||||
if current_device is not None:
|
||||
devices.append({
|
||||
"address": current_device,
|
||||
"name": current_name,
|
||||
"connected": current_connected
|
||||
})
|
||||
current_device = line.split()[1]
|
||||
current_name = None
|
||||
current_connected = None
|
||||
elif 'Name:' in line:
|
||||
current_name = line.split(': ', 1)[1]
|
||||
elif 'Connected:' in line:
|
||||
current_connected = line.split(': ')[1] == 'yes'
|
||||
|
||||
# Add the last device if any
|
||||
if current_device is not None:
|
||||
devices.append({
|
||||
"address": current_device,
|
||||
"name": current_name,
|
||||
"connected": current_connected
|
||||
})
|
||||
|
||||
if connected_devices_only:
|
||||
devices = [device for device in devices if device["connected"] == True]
|
||||
|
||||
return devices
|
104
custom_components/easy_computer_manager/computer/ssh_client.py
Normal file
104
custom_components/easy_computer_manager/computer/ssh_client.py
Normal file
@ -0,0 +1,104 @@
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
|
||||
import paramiko
|
||||
|
||||
from custom_components.easy_computer_manager import LOGGER
|
||||
from custom_components.easy_computer_manager.computer import CommandOutput
|
||||
|
||||
|
||||
class SSHClient:
|
||||
def __init__(self, host: str, username: str, password: Optional[str] = None, port: int = 22):
|
||||
self.host = host
|
||||
self.username = username
|
||||
self._password = password
|
||||
self.port = port
|
||||
self._connection: Optional[paramiko.SSHClient] = None
|
||||
|
||||
async def __aenter__(self):
|
||||
await self.connect()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_value, traceback):
|
||||
self.disconnect()
|
||||
|
||||
async def connect(self, retried: bool = False, computer: Optional['Computer'] = None) -> None:
|
||||
"""Open an SSH connection using Paramiko asynchronously."""
|
||||
if self.is_connection_alive():
|
||||
LOGGER.debug(f"Connection to {self.host} is already active.")
|
||||
return
|
||||
|
||||
self.disconnect() # Ensure any previous connection is closed
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
client = paramiko.SSHClient()
|
||||
|
||||
# Set missing host key policy to automatically accept unknown host keys
|
||||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
|
||||
try:
|
||||
# Offload the blocking connect call to a thread
|
||||
await loop.run_in_executor(None, self._blocking_connect, client)
|
||||
self._connection = client
|
||||
LOGGER.debug(f"Connected to {self.host}")
|
||||
|
||||
except (OSError, paramiko.SSHException) as exc:
|
||||
LOGGER.debug(f"Failed to connect to {self.host}: {exc}")
|
||||
if not retried:
|
||||
LOGGER.debug(f"Retrying connection to {self.host}...")
|
||||
await self.connect(retried=True) # Retry only once
|
||||
|
||||
finally:
|
||||
if computer is not None and hasattr(computer, "initialized"):
|
||||
computer.initialized = True
|
||||
|
||||
def disconnect(self) -> None:
|
||||
"""Close the SSH connection."""
|
||||
if self._connection:
|
||||
self._connection.close()
|
||||
LOGGER.debug(f"Disconnected from {self.host}")
|
||||
self._connection = None
|
||||
|
||||
def _blocking_connect(self, client: paramiko.SSHClient):
|
||||
"""Perform the blocking SSH connection using Paramiko."""
|
||||
client.connect(
|
||||
hostname=self.host,
|
||||
username=self.username,
|
||||
password=self._password,
|
||||
port=self.port,
|
||||
look_for_keys=False, # Set this to True if using private keys
|
||||
allow_agent=False
|
||||
)
|
||||
|
||||
async def execute_command(self, command: str) -> CommandOutput:
|
||||
"""Execute a command on the SSH server asynchronously."""
|
||||
if not self.is_connection_alive():
|
||||
LOGGER.debug(f"Connection to {self.host} is not alive. Reconnecting...")
|
||||
await self.connect()
|
||||
|
||||
try:
|
||||
# Offload command execution to avoid blocking
|
||||
loop = asyncio.get_running_loop()
|
||||
stdin, stdout, stderr = await loop.run_in_executor(None, self._connection.exec_command, command)
|
||||
|
||||
exit_status = stdout.channel.recv_exit_status()
|
||||
return CommandOutput(command, exit_status, stdout.read().decode(), stderr.read().decode())
|
||||
|
||||
except (paramiko.SSHException, EOFError) as exc:
|
||||
LOGGER.error(f"Failed to execute command on {self.host}: {exc}")
|
||||
return CommandOutput(command, -1, "", "")
|
||||
|
||||
def is_connection_alive(self) -> bool:
|
||||
"""Check if the SSH connection is still alive."""
|
||||
if self._connection is None:
|
||||
return False
|
||||
|
||||
try:
|
||||
transport = self._connection.get_transport()
|
||||
transport.send_ignore()
|
||||
|
||||
self._connection.exec_command('ls', timeout=1)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
return False
|
40
custom_components/easy_computer_manager/computer/utils.py
Normal file
40
custom_components/easy_computer_manager/computer/utils.py
Normal file
@ -0,0 +1,40 @@
|
||||
async def format_debug_information(computer: 'Computer'): # importing Computer causes circular import (how to fix?)
|
||||
"""Return debug information about the host system."""
|
||||
|
||||
data = {
|
||||
'os': {
|
||||
'name': computer.operating_system,
|
||||
'version': computer.operating_system_version,
|
||||
'desktop_environment': computer.desktop_environment
|
||||
},
|
||||
'connection': {
|
||||
'host': computer.host,
|
||||
'mac': computer.mac,
|
||||
'username': computer.username,
|
||||
'port': computer.port,
|
||||
'dualboot': computer.dualboot,
|
||||
'is_on': await computer.is_on(),
|
||||
'is_connected': computer._connection.is_connection_alive()
|
||||
},
|
||||
'grub': {
|
||||
'windows_entry': computer.windows_entry_grub
|
||||
},
|
||||
'audio': {
|
||||
'speakers': computer.audio_config.get('speakers'),
|
||||
'microphones': computer.audio_config.get('microphones')
|
||||
},
|
||||
'monitors': computer.monitors_config,
|
||||
'bluetooth_devices': computer.bluetooth_devices
|
||||
}
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def get_bluetooth_devices_as_str(computer: 'Computer') -> str:
|
||||
"""Return the bluetooth devices as a string."""
|
||||
devices = computer.bluetooth_devices
|
||||
|
||||
if not devices:
|
||||
return ""
|
||||
|
||||
return "; ".join([f"{device['name']} ({device['address']})" for device in devices])
|
@ -7,9 +7,8 @@ from typing import Any
|
||||
import voluptuous as vol
|
||||
from homeassistant import config_entries, exceptions
|
||||
from homeassistant.core import HomeAssistant
|
||||
from paramiko.ssh_exception import AuthenticationException
|
||||
|
||||
from . import utils
|
||||
from .computer import Computer
|
||||
from .const import DOMAIN
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -40,6 +39,8 @@ class Hub:
|
||||
self._name = host
|
||||
self._id = host.lower()
|
||||
|
||||
self.computer = Computer(host, "", username, password, port)
|
||||
|
||||
@property
|
||||
def hub_id(self) -> str:
|
||||
"""ID for dummy."""
|
||||
@ -48,9 +49,11 @@ class Hub:
|
||||
async def test_connection(self) -> bool:
|
||||
"""Test connectivity to the computer is OK."""
|
||||
try:
|
||||
return utils.test_connection(
|
||||
utils.create_ssh_connection(self._host, self._username, self._password, self._port))
|
||||
except AuthenticationException:
|
||||
# TODO: check if reachable
|
||||
_LOGGER.info("Testing connection to %s", self._host)
|
||||
return True
|
||||
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
@ -63,6 +66,7 @@ async def validate_input(hass: HomeAssistant, data: dict) -> dict[str, Any]:
|
||||
|
||||
hub = Hub(hass, data["host"], data["username"], data["password"], data["port"])
|
||||
|
||||
_LOGGER.info("Validating configuration")
|
||||
if not await hub.test_connection():
|
||||
raise CannotConnect
|
||||
|
||||
@ -81,7 +85,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
try:
|
||||
info = await validate_input(self.hass, user_input)
|
||||
return self.async_create_entry(title=info["title"], data=user_input)
|
||||
except (AuthenticationException, CannotConnect, InvalidHost) as ex:
|
||||
except (CannotConnect, InvalidHost) as ex:
|
||||
errors["base"] = str(ex)
|
||||
except Exception as ex: # pylint: disable=broad-except
|
||||
_LOGGER.exception("Unexpected exception: %s", ex)
|
||||
|
@ -11,3 +11,87 @@ SERVICE_CHANGE_MONITORS_CONFIG = "change_monitors_config"
|
||||
SERVICE_STEAM_BIG_PICTURE = "steam_big_picture"
|
||||
SERVICE_CHANGE_AUDIO_CONFIG = "change_audio_config"
|
||||
SERVICE_DEBUG_INFO = "debug_info"
|
||||
|
||||
|
||||
ACTIONS = {
|
||||
"operating_system": {
|
||||
"linux": ["uname"]
|
||||
},
|
||||
"operating_system_version": {
|
||||
"windows": ['for /f "tokens=1 delims=|" %i in (\'wmic os get Name ^| findstr /B /C:"Microsoft"\') do @echo %i'],
|
||||
"linux": ["awk -F'=' '/^NAME=|^VERSION=/{gsub(/\"/, \"\", $2); printf $2\" \"}\' /etc/os-release && echo", "lsb_release -a | awk '/Description/ {print $2, $3, $4}'"]
|
||||
},
|
||||
"desktop_environment": {
|
||||
"linux": ["for session in $(ls /usr/bin/*session 2>/dev/null); do basename $session | sed 's/-session//'; done | grep -E 'gnome|kde|xfce|mate|lxde|cinnamon|budgie|unity' | head -n 1"],
|
||||
"windows": ["echo Windows"]
|
||||
},
|
||||
"shutdown": {
|
||||
"windows": ["shutdown /s /t 0", "wmic os where Primary=TRUE call Shutdown"],
|
||||
"linux": ["sudo /sbin/shutdown -h now", "sudo /sbin/init 0", "sudo /usr/bin/systemctl poweroff"]
|
||||
},
|
||||
"restart": {
|
||||
"windows": ["shutdown /r /t 0", "wmic os where Primary=TRUE call Reboot"],
|
||||
"linux": ["sudo /sbin/shutdown -r now", "sudo /sbin/init 6", "sudo /usr/bin/systemctl reboot"]
|
||||
},
|
||||
"sleep": {
|
||||
"windows": ["shutdown /h /t 0", "rundll32.exe powrprof.dll,SetSuspendState Sleep"],
|
||||
"linux": ["sudo /usr/bin/systemctl suspend", "sudo /usr/sbin/pm-suspend"]
|
||||
},
|
||||
"get_windows_entry_grub": {
|
||||
"linux": ["sudo /usr/bin/cat /etc/grub2.cfg | awk -F \"'\" '/windows/ {print $2}'",
|
||||
"sudo /usr/bin/cat /etc/grub.cfg | awk -F \"'\" '/windows/ {print $2}'"]
|
||||
},
|
||||
"set_grub_entry": {
|
||||
"linux": {
|
||||
"commands": ["sudo /usr/sbin/grub-reboot %grub-entry%", "sudo /usr/sbin/grub2-reboot %grub-entry%"],
|
||||
"params": ["grub-entry"],
|
||||
}
|
||||
},
|
||||
"get_monitors_config": {
|
||||
"linux": ["gnome-monitor-config list"]
|
||||
},
|
||||
"set_monitors_config": {
|
||||
"linux": {
|
||||
"gnome": {
|
||||
"command": "gnome-monitor-config set %args%",
|
||||
"params": ["args"]
|
||||
}
|
||||
}
|
||||
},
|
||||
"get_speakers": {
|
||||
"linux": ["LANG=en_US.UTF-8 pactl list sinks"]
|
||||
},
|
||||
"get_microphones": {
|
||||
"linux": ["LANG=en_US.UTF-8 pactl list sources"]
|
||||
},
|
||||
"set_audio_config": {
|
||||
"linux": {
|
||||
"command": "LANG=en_US.UTF-8 pactl %args%",
|
||||
"params": ["args"]
|
||||
}
|
||||
},
|
||||
"get_bluetooth_devices": {
|
||||
"linux": {
|
||||
"command": "bluetoothctl info",
|
||||
"raise_on_error": False,
|
||||
}
|
||||
},
|
||||
"install_nirmcd": {
|
||||
"windows": {
|
||||
"command": "powershell -Command \"Invoke-WebRequest -Uri %download_url% -OutFile %install_path%\\nircmd.zip -UseBasicParsing; Expand-Archive %install_path%\\nircmd.zip -DestinationPath %install_path%; Remove-Item %install_path%\\nircmd.zip\"",
|
||||
"params": ["download_url", "install_path"]
|
||||
}
|
||||
},
|
||||
"start_steam_big_picture": {
|
||||
"linux": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -bigpicture &",
|
||||
"windows": "start steam://open/bigpicture"
|
||||
},
|
||||
"stop_steam_big_picture": {
|
||||
"linux": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -shutdown &",
|
||||
"windows": "C:\\Program Files (x86)\\Steam\\steam.exe -shutdown"
|
||||
},
|
||||
"exit_steam_big_picture": {
|
||||
"linux": "", # TODO: find a way to exit steam big picture
|
||||
"windows": "nircmd win close title \"Steam Big Picture Mode\""
|
||||
},
|
||||
}
|
@ -2,8 +2,7 @@
|
||||
"domain": "easy_computer_manager",
|
||||
"name": "Easy Computer Manager",
|
||||
"codeowners": [
|
||||
"@M4TH1EU",
|
||||
"@ntilley905"
|
||||
"@M4TH1EU"
|
||||
],
|
||||
"config_flow": true,
|
||||
"dependencies": [],
|
||||
@ -11,10 +10,8 @@
|
||||
"iot_class": "local_polling",
|
||||
"issue_tracker": "https://github.com/M4TH1EU/HA-EasyComputerManager/issues",
|
||||
"requirements": [
|
||||
"construct==2.10.70",
|
||||
"wakeonlan==3.1.0",
|
||||
"fabric2==3.2.2",
|
||||
"paramiko==3.4.0"
|
||||
"asyncssh==2.16.0"
|
||||
],
|
||||
"version": "1.3.0"
|
||||
"version": "2.0.0"
|
||||
}
|
||||
|
@ -1,80 +1,53 @@
|
||||
# Some snippets of code are from the official wake_on_lan integration (inspiration for this custom component)
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import subprocess as sp
|
||||
from typing import Any
|
||||
from typing import Any, Dict
|
||||
|
||||
import voluptuous as vol
|
||||
import wakeonlan
|
||||
from homeassistant.components.switch import (SwitchEntity)
|
||||
from homeassistant.components.switch import SwitchEntity
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import (
|
||||
CONF_HOST,
|
||||
CONF_MAC,
|
||||
CONF_NAME,
|
||||
CONF_PASSWORD,
|
||||
CONF_PORT,
|
||||
CONF_USERNAME, )
|
||||
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers import (
|
||||
device_registry as dr,
|
||||
entity_platform,
|
||||
CONF_HOST, CONF_MAC, CONF_NAME, CONF_PASSWORD, CONF_PORT, CONF_USERNAME,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse
|
||||
from homeassistant.helpers import entity_platform, device_registry as dr
|
||||
from homeassistant.helpers.config_validation import make_entity_service_schema
|
||||
from homeassistant.helpers.device_registry import DeviceInfo
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from paramiko.ssh_exception import AuthenticationException
|
||||
|
||||
from . import utils
|
||||
from .const import SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP, \
|
||||
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \
|
||||
SERVICE_CHANGE_MONITORS_CONFIG, SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO, DOMAIN
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
from .computer import OSType, Computer
|
||||
from .computer.utils import format_debug_information, get_bluetooth_devices_as_str
|
||||
from .const import (
|
||||
DOMAIN, SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP,
|
||||
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER,
|
||||
SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, SERVICE_CHANGE_MONITORS_CONFIG,
|
||||
SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO
|
||||
)
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant,
|
||||
config: ConfigEntry,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
async_add_entities: AddEntitiesCallback
|
||||
) -> None:
|
||||
# Retrieve the data from the config flow
|
||||
mac_address: str = config.data.get(CONF_MAC)
|
||||
# broadcast_address: str | None = config.data.get(CONF_BROADCAST_ADDRESS)
|
||||
# broadcast_port: int | None = config.data.get(CONF_BROADCAST_PORT)
|
||||
host: str = config.data.get(CONF_HOST)
|
||||
name: str = config.data.get(CONF_NAME)
|
||||
dualboot: bool = config.data.get("dualboot")
|
||||
username: str = config.data.get(CONF_USERNAME)
|
||||
password: str = config.data.get(CONF_PASSWORD)
|
||||
port: int | None = config.data.get(CONF_PORT)
|
||||
"""Set up the computer switch from a config entry."""
|
||||
mac_address = config.data[CONF_MAC]
|
||||
host = config.data[CONF_HOST]
|
||||
name = config.data[CONF_NAME]
|
||||
dualboot = config.data.get("dualboot", False)
|
||||
username = config.data[CONF_USERNAME]
|
||||
password = config.data[CONF_PASSWORD]
|
||||
port = config.data.get(CONF_PORT)
|
||||
|
||||
# Register the computer switch
|
||||
async_add_entities(
|
||||
[
|
||||
ComputerSwitch(
|
||||
hass,
|
||||
name,
|
||||
host,
|
||||
mac_address,
|
||||
dualboot,
|
||||
username,
|
||||
password,
|
||||
port,
|
||||
),
|
||||
],
|
||||
host is not None,
|
||||
[ComputerSwitch(hass, name, host, mac_address, dualboot, username, password, port)],
|
||||
True
|
||||
)
|
||||
|
||||
platform = entity_platform.async_get_current_platform()
|
||||
|
||||
# Synthax : (service_name: str, schema: dict, supports_response: SupportsResponse)
|
||||
# Service registrations
|
||||
services = [
|
||||
(SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE),
|
||||
(SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE),
|
||||
(SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, {}, SupportsResponse.NONE),
|
||||
(SERVICE_PUT_COMPUTER_TO_SLEEP, {}, SupportsResponse.NONE),
|
||||
@ -91,18 +64,18 @@ async def async_setup_entry(
|
||||
(SERVICE_DEBUG_INFO, {}, SupportsResponse.ONLY),
|
||||
]
|
||||
|
||||
# Register the services that depends on the switch
|
||||
for service in services:
|
||||
# Register services with their schemas
|
||||
for service_name, schema, supports_response in services:
|
||||
platform.async_register_entity_service(
|
||||
service[0],
|
||||
make_entity_service_schema(service[1]),
|
||||
service[0],
|
||||
supports_response=service[2]
|
||||
service_name,
|
||||
make_entity_service_schema(schema),
|
||||
service_name,
|
||||
supports_response=supports_response
|
||||
)
|
||||
|
||||
|
||||
class ComputerSwitch(SwitchEntity):
|
||||
"""Representation of a computer switch."""
|
||||
"""Representation of a computer switch entity."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@ -110,216 +83,121 @@ class ComputerSwitch(SwitchEntity):
|
||||
name: str,
|
||||
host: str | None,
|
||||
mac_address: str,
|
||||
# broadcast_address: str | None,
|
||||
# broadcast_port: int | None,
|
||||
dualboot: bool | False,
|
||||
dualboot: bool,
|
||||
username: str,
|
||||
password: str,
|
||||
port: int | None,
|
||||
) -> None:
|
||||
"""Initialize the WOL switch."""
|
||||
|
||||
self._hass = hass
|
||||
"""Initialize the computer switch entity."""
|
||||
self.hass = hass
|
||||
self._attr_name = name
|
||||
self._host = host
|
||||
self._mac_address = mac_address
|
||||
# self._broadcast_address = broadcast_address
|
||||
# self._broadcast_port = broadcast_port
|
||||
self._dualboot = dualboot
|
||||
self._username = username
|
||||
self._password = password
|
||||
self._port = port
|
||||
self._state = False
|
||||
self._attr_assumed_state = host is None
|
||||
self._attr_should_poll = bool(not self._attr_assumed_state)
|
||||
self._attr_unique_id = dr.format_mac(mac_address)
|
||||
self._state = False
|
||||
self._attr_should_poll = not self._attr_assumed_state
|
||||
self._attr_extra_state_attributes = {}
|
||||
self._connection = utils.create_ssh_connection(self._host, self._username, self._password)
|
||||
|
||||
self.computer = Computer(host, mac_address, username, password, port, dualboot)
|
||||
|
||||
@property
|
||||
def device_info(self) -> DeviceInfo | None:
|
||||
"""Return the device information."""
|
||||
if self._host is None:
|
||||
return None
|
||||
|
||||
def device_info(self) -> DeviceInfo:
|
||||
"""Return device info for the registry."""
|
||||
return DeviceInfo(
|
||||
identifiers={(DOMAIN, self._mac_address)},
|
||||
identifiers={(DOMAIN, self.computer.mac)},
|
||||
name=self._attr_name,
|
||||
manufacturer="Generic",
|
||||
model="Computer",
|
||||
sw_version=utils.get_operating_system_version(
|
||||
self._connection) if self._state else "Unknown",
|
||||
connections={(dr.CONNECTION_NETWORK_MAC, self._mac_address)},
|
||||
sw_version=self.computer.operating_system_version,
|
||||
connections={(dr.CONNECTION_NETWORK_MAC, self.computer.mac)},
|
||||
)
|
||||
|
||||
@property
|
||||
def icon(self) -> str:
|
||||
"""Return the icon to use in the frontend, if any."""
|
||||
return "mdi:monitor" if self._state else "mdi:monitor-off"
|
||||
|
||||
@property
|
||||
def is_on(self) -> bool:
|
||||
"""Return true if the computer switch is on."""
|
||||
"""Return true if the computer is on."""
|
||||
return self._state
|
||||
|
||||
def turn_on(self, **kwargs: Any) -> None:
|
||||
"""Turn the computer on using wake on lan."""
|
||||
service_kwargs: dict[str, Any] = {}
|
||||
# if self._broadcast_address is not None:
|
||||
# service_kwargs["ip_address"] = self._broadcast_address
|
||||
# if self._broadcast_port is not None:
|
||||
# service_kwargs["port"] = self._broadcast_port
|
||||
|
||||
_LOGGER.debug(
|
||||
"Send magic packet to mac %s (broadcast: %s, port: %s)",
|
||||
self._mac_address,
|
||||
# self._broadcast_address,
|
||||
# self._broadcast_port,
|
||||
)
|
||||
|
||||
wakeonlan.send_magic_packet(self._mac_address, **service_kwargs)
|
||||
async def async_turn_on(self, **kwargs: Any) -> None:
|
||||
"""Turn the computer on using Wake-on-LAN."""
|
||||
await self.computer.start()
|
||||
|
||||
if self._attr_assumed_state:
|
||||
self._state = True
|
||||
self.async_write_ha_state()
|
||||
|
||||
def turn_off(self, **kwargs: Any) -> None:
|
||||
"""Turn the computer off using appropriate shutdown command based on running OS and/or distro."""
|
||||
utils.shutdown_system(self._connection)
|
||||
async def async_turn_off(self, **kwargs: Any) -> None:
|
||||
"""Turn the computer off via shutdown command."""
|
||||
await self.computer.shutdown()
|
||||
|
||||
if self._attr_assumed_state:
|
||||
self._state = False
|
||||
self.async_write_ha_state()
|
||||
|
||||
def restart_to_windows_from_linux(self) -> None:
|
||||
"""Restart the computer to Windows from a running Linux by setting grub-reboot and restarting."""
|
||||
async def async_update(self) -> None:
|
||||
"""Update the state by checking if the computer is on."""
|
||||
is_on = await self.computer.is_on()
|
||||
if self.is_on != is_on:
|
||||
self._state = is_on
|
||||
# self.async_write_ha_state()
|
||||
|
||||
if self._dualboot:
|
||||
utils.restart_to_windows_from_linux(self._connection)
|
||||
else:
|
||||
_LOGGER.error(
|
||||
"The computer with the IP address %s is not running a dualboot system or hasn't been configured "
|
||||
"correctly in the UI.",
|
||||
self._host)
|
||||
|
||||
def restart_to_linux_from_windows(self) -> None:
|
||||
"""Restart the computer to Linux from a running Windows by setting grub-reboot and restarting."""
|
||||
|
||||
if self._dualboot:
|
||||
# TODO: check for default grub entry and adapt accordingly
|
||||
utils.restart_system(self._connection)
|
||||
else:
|
||||
_LOGGER.error(
|
||||
"The computer with the IP address %s is not running a dualboot system or hasn't been configured "
|
||||
"correctly in the UI.",
|
||||
self._host)
|
||||
|
||||
def put_computer_to_sleep(self) -> None:
|
||||
"""Put the computer to sleep using appropriate sleep command based on running OS and/or distro."""
|
||||
utils.sleep_system(self._connection)
|
||||
|
||||
def start_computer_to_windows(self) -> None:
|
||||
"""Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart."""
|
||||
self.turn_on()
|
||||
|
||||
if self._dualboot:
|
||||
# Wait for the computer to boot using a dedicated thread to avoid blocking the main thread
|
||||
self._hass.loop.create_task(self.service_restart_to_windows_from_linux())
|
||||
|
||||
else:
|
||||
_LOGGER.error(
|
||||
"The computer with the IP address %s is not running a dualboot system or hasn't been configured "
|
||||
"correctly in the UI.",
|
||||
self._host)
|
||||
|
||||
async def service_restart_to_windows_from_linux(self) -> None:
|
||||
"""Method to be run in a separate thread to wait for the computer to boot and then reboot to Windows."""
|
||||
while not self.is_on:
|
||||
await asyncio.sleep(3)
|
||||
|
||||
await utils.restart_to_windows_from_linux(self._connection)
|
||||
|
||||
def restart_computer(self) -> None:
|
||||
"""Restart the computer using appropriate restart command based on running OS and/or distro."""
|
||||
|
||||
# TODO: check for default grub entry and adapt accordingly
|
||||
if self._dualboot and not utils.is_unix_system(connection=self._connection):
|
||||
utils.restart_system(self._connection)
|
||||
|
||||
# Wait for the computer to boot using a dedicated thread to avoid blocking the main thread
|
||||
self.restart_to_windows_from_linux()
|
||||
else:
|
||||
utils.restart_system(self._connection)
|
||||
|
||||
def change_monitors_config(self, monitors_config: dict | None = None) -> None:
|
||||
"""Change the monitors configuration using a YAML config file."""
|
||||
if monitors_config is not None and len(monitors_config) > 0:
|
||||
utils.change_monitors_config(self._connection, monitors_config)
|
||||
else:
|
||||
raise HomeAssistantError("The 'monitors_config' parameter must be a non-empty dictionary.")
|
||||
|
||||
def steam_big_picture(self, action: str) -> None:
|
||||
"""Controls Steam Big Picture mode."""
|
||||
|
||||
if action is not None:
|
||||
utils.steam_big_picture(self._connection, action)
|
||||
else:
|
||||
raise HomeAssistantError("The 'action' parameter must be specified.")
|
||||
|
||||
def change_audio_config(self, volume: int | None = None, mute: bool | None = None, input_device: str | None = None,
|
||||
output_device: str | None = None) -> None:
|
||||
"""Change the audio configuration using a YAML config file."""
|
||||
utils.change_audio_config(self._connection, volume, mute, input_device, output_device)
|
||||
|
||||
def update(self) -> None:
|
||||
"""Ping the computer to see if it is online and update the state."""
|
||||
timeout = 1
|
||||
ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self._host)]
|
||||
status = sp.call(ping_cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
|
||||
self._state = not bool(status)
|
||||
|
||||
# Update the state attributes and the connection only if the computer is on
|
||||
if self._state:
|
||||
if self._connection is None or not utils.test_connection(self._connection):
|
||||
self.renew_ssh_connection()
|
||||
|
||||
if not self._state:
|
||||
return
|
||||
# If the computer is on, update its attributes
|
||||
if is_on:
|
||||
await self.computer.update(is_on)
|
||||
|
||||
self._attr_extra_state_attributes = {
|
||||
"operating_system": utils.get_operating_system(self._connection),
|
||||
"operating_system_version": utils.get_operating_system_version(self._connection),
|
||||
"mac_address": self._mac_address,
|
||||
"ip_address": self._host,
|
||||
"connected_devices": utils.get_bluetooth_devices(self._connection, only_connected=True, return_as_string=True),
|
||||
"operating_system": self.computer.operating_system,
|
||||
"operating_system_version": self.computer.operating_system_version,
|
||||
"mac_address": self.computer.mac,
|
||||
"ip_address": self.computer.host,
|
||||
"connected_devices": get_bluetooth_devices_as_str(self.computer),
|
||||
}
|
||||
|
||||
def renew_ssh_connection(self) -> None:
|
||||
"""Renew the SSH connection."""
|
||||
_LOGGER.info("Renewing SSH connection to %s using username %s", self._host, self._username)
|
||||
# Service methods for various functionalities
|
||||
async def restart_to_windows_from_linux(self) -> None:
|
||||
"""Restart the computer from Linux to Windows."""
|
||||
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
|
||||
|
||||
if self._connection is not None:
|
||||
self._connection.close()
|
||||
async def restart_to_linux_from_windows(self) -> None:
|
||||
"""Restart the computer from Windows to Linux."""
|
||||
await self.computer.restart(OSType.WINDOWS, OSType.LINUX)
|
||||
|
||||
try:
|
||||
self._connection = utils.create_ssh_connection(self._host, self._username, self._password)
|
||||
self._connection.open()
|
||||
except AuthenticationException as error:
|
||||
_LOGGER.error("Could not authenticate to %s using username %s: %s", self._host, self._username, error)
|
||||
self._state = False
|
||||
except Exception as error:
|
||||
_LOGGER.error("Could not connect to %s using username %s: %s", self._host, self._username, error)
|
||||
async def put_computer_to_sleep(self) -> None:
|
||||
"""Put the computer to sleep."""
|
||||
await self.computer.put_to_sleep()
|
||||
|
||||
# Check if the error is due to timeout
|
||||
if "timed out" in str(error):
|
||||
_LOGGER.warning(
|
||||
"Computer at %s does not respond to the SSH request. Possible causes: might be offline, "
|
||||
"the firewall is blocking the SSH port, or the SSH server is offline and/or misconfigured.",
|
||||
self._host
|
||||
)
|
||||
async def start_computer_to_windows(self) -> None:
|
||||
"""Start the computer to Windows after booting into Linux first."""
|
||||
await self.computer.start()
|
||||
|
||||
self._state = False
|
||||
async def wait_and_reboot() -> None:
|
||||
"""Wait until the computer is on, then restart to Windows."""
|
||||
while not await self.computer.is_on():
|
||||
await asyncio.sleep(3)
|
||||
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
|
||||
|
||||
def debug_info(self) -> ServiceResponse:
|
||||
"""Prints debug info."""
|
||||
return utils.get_debug_info(self._connection)
|
||||
self.hass.loop.create_task(wait_and_reboot())
|
||||
|
||||
async def restart_computer(self) -> None:
|
||||
"""Restart the computer."""
|
||||
await self.computer.restart()
|
||||
|
||||
async def change_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
|
||||
"""Change the monitor configuration."""
|
||||
await self.computer.set_monitors_config(monitors_config)
|
||||
|
||||
async def steam_big_picture(self, action: str) -> None:
|
||||
"""Control Steam Big Picture mode."""
|
||||
await self.computer.steam_big_picture(action)
|
||||
|
||||
async def change_audio_config(
|
||||
self, volume: int | None = None, mute: bool | None = None,
|
||||
input_device: str | None = None, output_device: str | None = None
|
||||
) -> None:
|
||||
"""Change the audio configuration."""
|
||||
await self.computer.set_audio_config(volume, mute, input_device, output_device)
|
||||
|
||||
async def debug_info(self) -> ServiceResponse:
|
||||
"""Return debug information."""
|
||||
return await format_debug_information(self.computer)
|
||||
|
@ -1,550 +0,0 @@
|
||||
import logging
|
||||
import re
|
||||
|
||||
import fabric2
|
||||
from fabric2 import Connection
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# _LOGGER.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
def create_ssh_connection(host: str, username: str, password: str, port=22):
|
||||
"""Create an SSH connection to a host using a username and password specified in the config flow."""
|
||||
conf = fabric2.Config()
|
||||
conf.run.hide = True
|
||||
conf.run.warn = True
|
||||
conf.warn = True
|
||||
conf.sudo.password = password
|
||||
conf.password = password
|
||||
|
||||
connection = Connection(
|
||||
host=host, user=username, port=port, connect_timeout=3, connect_kwargs={"password": password},
|
||||
config=conf
|
||||
)
|
||||
|
||||
_LOGGER.info("Successfully created SSH connection to %s using username %s", host, username)
|
||||
|
||||
return connection
|
||||
|
||||
|
||||
def test_connection(connection: Connection):
|
||||
"""Test the connection to the host by running a simple command."""
|
||||
try:
|
||||
connection.run('ls')
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def is_unix_system(connection: Connection):
|
||||
"""Return a boolean based on get_operating_system result."""
|
||||
return get_operating_system(connection) == "Linux/Unix"
|
||||
|
||||
|
||||
def get_operating_system_version(connection: Connection, is_unix=None):
|
||||
"""Return the running operating system name and version."""
|
||||
|
||||
if is_unix is None:
|
||||
is_unix = is_unix_system(connection)
|
||||
|
||||
if is_unix:
|
||||
result = connection.run(
|
||||
"awk -F'=' '/^NAME=|^VERSION=/{gsub(/\"/, \"\", $2); printf $2\" \"}\' /etc/os-release && echo").stdout
|
||||
if result == "":
|
||||
result = connection.run("lsb_release -a | awk '/Description/ {print $2, $3, $4}'").stdout
|
||||
|
||||
return result
|
||||
else:
|
||||
return connection.run(
|
||||
'for /f "tokens=1 delims=|" %i in (\'wmic os get Name ^| findstr /B /C:"Microsoft"\') do @echo %i').stdout
|
||||
|
||||
|
||||
def get_operating_system(connection: Connection):
|
||||
"""Return the running operating system type."""
|
||||
# TODO: might be a better way to do this
|
||||
result = connection.run("uname")
|
||||
if result.return_code == 0:
|
||||
return "Linux/Unix"
|
||||
else:
|
||||
return "Windows/Other"
|
||||
|
||||
|
||||
def shutdown_system(connection: Connection, is_unix=None):
|
||||
"""Shutdown the system."""
|
||||
|
||||
if is_unix is None:
|
||||
is_unix = is_unix_system(connection)
|
||||
|
||||
shutdown_commands = {
|
||||
"unix": ["sudo shutdown -h now", "sudo init 0", "sudo systemctl poweroff"],
|
||||
"windows": ["shutdown /s /t 0", "wmic os where Primary=TRUE call Shutdown"]
|
||||
}
|
||||
|
||||
for command in shutdown_commands["unix" if is_unix else "windows"]:
|
||||
result = connection.run(command)
|
||||
if result.return_code == 0:
|
||||
_LOGGER.debug("System shutting down on %s.", connection.host)
|
||||
connection.close()
|
||||
return
|
||||
|
||||
raise HomeAssistantError(f"Cannot shutdown system running at {connection.host}, all methods failed.")
|
||||
|
||||
|
||||
def restart_system(connection: Connection, is_unix=None):
|
||||
"""Restart the system."""
|
||||
|
||||
if is_unix is None:
|
||||
is_unix = is_unix_system(connection)
|
||||
|
||||
restart_commands = {
|
||||
"unix": ["sudo shutdown -r now", "sudo init 6", "sudo systemctl reboot"],
|
||||
"windows": ["shutdown /r /t 0", "wmic os where Primary=TRUE call Reboot"]
|
||||
}
|
||||
|
||||
for command in restart_commands["unix" if is_unix else "windows"]:
|
||||
result = connection.run(command)
|
||||
if result.return_code == 0:
|
||||
_LOGGER.debug("System restarting on %s.", connection.host)
|
||||
return
|
||||
|
||||
raise HomeAssistantError(f"Cannot restart system running at {connection.host}, all methods failed.")
|
||||
|
||||
|
||||
def sleep_system(connection: Connection, is_unix=None):
|
||||
"""Put the system to sleep."""
|
||||
|
||||
if is_unix is None:
|
||||
is_unix = is_unix_system(connection)
|
||||
|
||||
sleep_commands = {
|
||||
"unix": ["sudo systemctl suspend", "sudo pm-suspend"],
|
||||
"windows": ["shutdown /h /t 0", "rundll32.exe powrprof.dll,SetSuspendState Sleep"]
|
||||
}
|
||||
|
||||
for command in sleep_commands["unix" if is_unix else "windows"]:
|
||||
result = connection.run(command)
|
||||
if result.return_code == 0:
|
||||
_LOGGER.debug("System sleeping on %s.", connection.host)
|
||||
return
|
||||
|
||||
raise HomeAssistantError(f"Cannot put system running at {connection.host} to sleep, all methods failed.")
|
||||
|
||||
|
||||
def get_windows_entry_in_grub(connection: Connection):
|
||||
"""
|
||||
Grabs the Windows entry name in GRUB.
|
||||
Used later with grub-reboot to specify which entry to boot.
|
||||
"""
|
||||
commands = [
|
||||
"sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub/grub.cfg",
|
||||
"sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub2/grub.cfg"
|
||||
]
|
||||
|
||||
for command in commands:
|
||||
result = connection.run(command)
|
||||
if result.return_code == 0 and result.stdout.strip():
|
||||
_LOGGER.debug("Found Windows entry in GRUB: " + result.stdout.strip())
|
||||
return result.stdout.strip()
|
||||
|
||||
_LOGGER.error("Could not find Windows entry in GRUB for system running at %s.", connection.host)
|
||||
return None
|
||||
|
||||
|
||||
def restart_to_windows_from_linux(connection: Connection):
|
||||
"""Restart a running Linux system to Windows."""
|
||||
|
||||
if not is_unix_system(connection):
|
||||
raise HomeAssistantError(f"System running at {connection.host} is not a Linux system.")
|
||||
|
||||
windows_entry = get_windows_entry_in_grub(connection)
|
||||
|
||||
if windows_entry is not None:
|
||||
reboot_commands = ["sudo grub-reboot", "sudo grub2-reboot"]
|
||||
|
||||
for reboot_command in reboot_commands:
|
||||
result = connection.run(f"{reboot_command} \"{windows_entry}\"")
|
||||
|
||||
if result.return_code == 0:
|
||||
_LOGGER.debug("Rebooting to Windows")
|
||||
restart_system(connection)
|
||||
return
|
||||
|
||||
raise HomeAssistantError(f"Failed to restart system running on {connection.host} to Windows from Linux.")
|
||||
else:
|
||||
raise HomeAssistantError(f"Could not find Windows entry in grub for system running at {connection.host}.")
|
||||
|
||||
|
||||
def change_monitors_config(connection: Connection, monitors_config: dict):
|
||||
"""Change monitors configuration on the host (Linux + Gnome, and partial Windows support)."""
|
||||
|
||||
if is_unix_system(connection):
|
||||
command_parts = ["gnome-monitor-config", "set"]
|
||||
|
||||
for monitor, settings in monitors_config.items():
|
||||
if settings.get('enabled', False):
|
||||
command_parts.extend(['-LpM' if settings.get('primary', False) else '-LM', monitor])
|
||||
|
||||
if 'position' in settings:
|
||||
command_parts.extend(['-x', str(settings["position"][0]), '-y', str(settings["position"][1])])
|
||||
|
||||
if 'mode' in settings:
|
||||
command_parts.extend(['-m', settings["mode"]])
|
||||
|
||||
if 'scale' in settings:
|
||||
command_parts.extend(['-s', str(settings["scale"])])
|
||||
|
||||
if 'transform' in settings:
|
||||
command_parts.extend(['-t', settings["transform"]])
|
||||
|
||||
command = ' '.join(command_parts)
|
||||
_LOGGER.debug("Running command: %s", command)
|
||||
|
||||
result = connection.run(command)
|
||||
|
||||
if result.return_code == 0:
|
||||
_LOGGER.info("Successfully changed monitors config on system running on %s.", connection.host)
|
||||
|
||||
# Run it once again, it fixes some strange Gnome display bug sometimes and it doesn't hurt
|
||||
connection.run(command)
|
||||
else:
|
||||
raise HomeAssistantError("Could not change monitors config on system running on %s, check logs with debug",
|
||||
connection.host)
|
||||
|
||||
else:
|
||||
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||
|
||||
# TODO: Implement Windows support using NIRCMD
|
||||
command_parts = ["nircmd.exe", "setdisplay"]
|
||||
# setdisplay {monitor:index/name} [width] [height] [color bits] {refresh rate} {-updatereg} {-allusers}
|
||||
|
||||
for monitor, settings in monitors_config.items():
|
||||
if settings.get('enabled', False):
|
||||
command_parts.extend(
|
||||
[f'{monitor} -primary' if settings.get('primary', False) else f'{monitor} -secondary'])
|
||||
|
||||
if 'resolution' in settings:
|
||||
command_parts.extend([str(settings["resolution"][0]), str(settings["resolution"][1])])
|
||||
|
||||
if 'refresh_rate' in settings:
|
||||
command_parts.extend(['-hz', str(settings["refresh_rate"])])
|
||||
|
||||
if 'color_bits' in settings:
|
||||
command_parts.extend(['-bits', str(settings["color_bits"])])
|
||||
|
||||
command = ' '.join(command_parts)
|
||||
_LOGGER.debug("Running command: %s", command)
|
||||
|
||||
result = connection.run(command)
|
||||
|
||||
if result.return_code == 0:
|
||||
_LOGGER.info("Successfully changed monitors config on system running on %s.", connection.host)
|
||||
else:
|
||||
raise HomeAssistantError("Could not change monitors config on system running on %s, check logs with debug",
|
||||
connection.host)
|
||||
|
||||
|
||||
def silent_install_nircmd(connection: Connection):
|
||||
"""Silently install NIRCMD on a Windows system."""
|
||||
|
||||
if not is_unix_system(connection):
|
||||
download_url = "https://www.nirsoft.net/utils/nircmd.zip"
|
||||
install_path = f"C:\\Users\\{connection.user}\\AppData\\Local\\EasyComputerManager"
|
||||
|
||||
# Download and unzip NIRCMD
|
||||
download_command = f"powershell -Command \"Invoke-WebRequest -Uri {download_url} -OutFile {install_path}\\nircmd.zip -UseBasicParsing\""
|
||||
unzip_command = f"powershell -Command \"Expand-Archive {install_path}\\nircmd.zip -DestinationPath {install_path}\""
|
||||
remove_zip_command = f"powershell -Command \"Remove-Item {install_path}\\nircmd.zip\""
|
||||
|
||||
commands = [download_command, unzip_command, remove_zip_command]
|
||||
|
||||
for command in commands:
|
||||
result = connection.run(command)
|
||||
if result.return_code != 0:
|
||||
_LOGGER.error("Could not install NIRCMD on system running on %s.", connection.host)
|
||||
return
|
||||
|
||||
|
||||
def get_monitors_config(connection: Connection) -> dict:
|
||||
"""Parse the output of the gnome-monitor-config command to get the current monitor configuration."""
|
||||
|
||||
if is_unix_system(connection):
|
||||
result = connection.run("gnome-monitor-config list")
|
||||
if result.return_code != 0:
|
||||
raise HomeAssistantError(f"Could not get monitors config on system running at {connection.host}. "
|
||||
f"Make sure gnome-monitor-config package is installed.")
|
||||
|
||||
monitors = []
|
||||
current_monitor = None
|
||||
|
||||
for line in result.stdout.split('\n'):
|
||||
monitor_match = re.match(r'^Monitor \[ (.+?) \] (ON|OFF)$', line)
|
||||
if monitor_match:
|
||||
if current_monitor:
|
||||
monitors.append(current_monitor)
|
||||
source, status = monitor_match.groups()
|
||||
current_monitor = {'source': source, 'status': status, 'names': [], 'resolutions': []}
|
||||
elif current_monitor:
|
||||
display_name_match = re.match(r'^\s+display-name: (.+)$', line)
|
||||
resolution_match = re.match(r'^\s+(\d+x\d+@\d+(?:\.\d+)?).*$', line)
|
||||
if display_name_match:
|
||||
current_monitor['names'].append(display_name_match.group(1).replace('"', ''))
|
||||
elif resolution_match:
|
||||
# Don't include resolutions under 1280x720
|
||||
if int(resolution_match.group(1).split('@')[0].split('x')[0]) >= 1280:
|
||||
|
||||
# If there are already resolutions in the list, check if the framerate between the last is >1
|
||||
if len(current_monitor['resolutions']) > 0:
|
||||
last_resolution = current_monitor['resolutions'][-1]
|
||||
last_resolution_size = last_resolution.split('@')[0]
|
||||
this_resolution_size = resolution_match.group(1).split('@')[0]
|
||||
|
||||
# Only truncate some framerates if the resolution are the same
|
||||
if last_resolution_size == this_resolution_size:
|
||||
last_resolution_framerate = float(last_resolution.split('@')[1])
|
||||
this_resolution_framerate = float(resolution_match.group(1).split('@')[1])
|
||||
|
||||
# If the difference between the last resolution framerate and this one is >1, ignore it
|
||||
if last_resolution_framerate - 1 > this_resolution_framerate:
|
||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||
else:
|
||||
# If the resolution is different, this adds the new resolution
|
||||
# to the list without truncating
|
||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||
else:
|
||||
# This is the first resolution, add it to the list
|
||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||
|
||||
if current_monitor:
|
||||
monitors.append(current_monitor)
|
||||
|
||||
return monitors
|
||||
else:
|
||||
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||
|
||||
|
||||
def steam_big_picture(connection: Connection, action: str):
|
||||
"""Controls Steam in Big Picture mode on the host."""
|
||||
|
||||
_LOGGER.debug(f"Running Steam Big Picture action {action} on system running at {connection.host}.")
|
||||
|
||||
steam_commands = {
|
||||
"start": {
|
||||
"unix": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -bigpicture &",
|
||||
"windows": "start steam://open/bigpicture"
|
||||
},
|
||||
"stop": {
|
||||
"unix": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -shutdown &",
|
||||
"windows": "C:\\Program Files (x86)\\Steam\\steam.exe -shutdown"
|
||||
# TODO: check for different Steam install paths
|
||||
},
|
||||
"exit": {
|
||||
"unix": None, # TODO: find a way to exit Steam Big Picture
|
||||
"windows": "nircmd win close title \"Steam Big Picture Mode\""
|
||||
# TODO: need to test (thx @MasterHidra https://www.reddit.com/r/Steam/comments/5c9l20/comment/k5fmb3k)
|
||||
}
|
||||
}
|
||||
|
||||
command = steam_commands.get(action)
|
||||
|
||||
if command is None:
|
||||
raise HomeAssistantError(
|
||||
f"Invalid action {action} for Steam Big Picture on system running at {connection.host}.")
|
||||
|
||||
if is_unix_system(connection):
|
||||
result = connection.run(command.get("unix"))
|
||||
else:
|
||||
result = connection.run(command.get("windows"))
|
||||
|
||||
if result.return_code != 0:
|
||||
raise HomeAssistantError(f"Could not {action} Steam Big Picture on system running at {connection.host}.")
|
||||
|
||||
|
||||
def get_audio_config(connection: Connection):
|
||||
if is_unix_system(connection):
|
||||
config = {'sinks': [], 'sources': []}
|
||||
|
||||
def parse_device_info(lines, device_type):
|
||||
devices = []
|
||||
current_device = {}
|
||||
|
||||
for line in lines:
|
||||
if line.startswith(f"{device_type} #"):
|
||||
if current_device and "Monitor" not in current_device['description']:
|
||||
devices.append(current_device)
|
||||
current_device = {'id': int(re.search(r'#(\d+)', line).group(1))}
|
||||
elif line.startswith(" Name:"):
|
||||
current_device['name'] = line.split(":")[1].strip()
|
||||
elif line.startswith(" State:"):
|
||||
current_device['state'] = line.split(":")[1].strip()
|
||||
elif line.startswith(" Description:"):
|
||||
current_device['description'] = line.split(":")[1].strip()
|
||||
|
||||
if current_device:
|
||||
devices.append(current_device)
|
||||
|
||||
return devices
|
||||
|
||||
# Get sinks
|
||||
result = connection.run("LANG=en_US.UTF-8 pactl list sinks")
|
||||
if result.return_code != 0:
|
||||
raise HomeAssistantError(f"Could not get audio sinks on system running at {connection.host}. "
|
||||
f"Make sure pactl package is installed!")
|
||||
config['sinks'] = parse_device_info(result.stdout.split('\n'), 'Sink')
|
||||
|
||||
# Get sources
|
||||
result = connection.run("LANG=en_US.UTF-8 pactl list sources")
|
||||
if result.return_code != 0:
|
||||
raise HomeAssistantError(f"Could not get audio sources on system running at {connection.host}."
|
||||
f"Make sure pactl package is installed!")
|
||||
config['sources'] = parse_device_info(result.stdout.split('\n'), 'Source')
|
||||
|
||||
return config
|
||||
else:
|
||||
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||
|
||||
|
||||
def change_audio_config(connection: Connection, volume: int, mute: bool, input_device: str = "@DEFAULT_SOURCE@",
|
||||
output_device: str = "@DEFAULT_SINK@"):
|
||||
"""Change audio configuration on the host system."""
|
||||
|
||||
if is_unix_system(connection):
|
||||
current_config = get_audio_config(connection)
|
||||
executable = "pactl"
|
||||
commands = []
|
||||
|
||||
def get_device_id(device_type, user_device):
|
||||
for device in current_config[device_type]:
|
||||
if device['description'] == user_device:
|
||||
return device['name']
|
||||
return user_device
|
||||
|
||||
# Set default sink and source if not specified
|
||||
if not output_device:
|
||||
output_device = "@DEFAULT_SINK@"
|
||||
if not input_device:
|
||||
input_device = "@DEFAULT_SOURCE@"
|
||||
|
||||
# Set default sink if specified
|
||||
if output_device and output_device != "@DEFAULT_SINK@":
|
||||
output_device = get_device_id('sinks', output_device)
|
||||
commands.append(f"{executable} set-default-sink {output_device}")
|
||||
|
||||
# Set default source if specified
|
||||
if input_device and input_device != "@DEFAULT_SOURCE@":
|
||||
input_device = get_device_id('sources', input_device)
|
||||
commands.append(f"{executable} set-default-source {input_device}")
|
||||
|
||||
# Set sink volume if specified
|
||||
if volume is not None:
|
||||
commands.append(f"{executable} set-sink-volume {output_device} {volume}%")
|
||||
|
||||
# Set sink and source mute status if specified
|
||||
if mute is not None:
|
||||
commands.append(f"{executable} set-sink-mute {output_device} {'yes' if mute else 'no'}")
|
||||
commands.append(f"{executable} set-source-mute {input_device} {'yes' if mute else 'no'}")
|
||||
|
||||
# Execute commands
|
||||
for command in commands:
|
||||
_LOGGER.debug("Running command: %s", command)
|
||||
result = connection.run(command)
|
||||
|
||||
if result.return_code != 0:
|
||||
raise HomeAssistantError(
|
||||
f"Could not change audio config on system running on {connection.host}, check logs with debug and"
|
||||
f"make sure pactl package is installed!")
|
||||
else:
|
||||
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||
|
||||
|
||||
def get_debug_info(connection: Connection):
|
||||
"""Return debug information about the host system."""
|
||||
|
||||
data = {}
|
||||
|
||||
data_os = {
|
||||
'name': get_operating_system(connection),
|
||||
'version': get_operating_system_version(connection),
|
||||
'is_unix': is_unix_system(connection)
|
||||
}
|
||||
|
||||
data_ssh = {
|
||||
'is_connected': connection.is_connected,
|
||||
'username': connection.user,
|
||||
'host': connection.host,
|
||||
'port': connection.port
|
||||
}
|
||||
|
||||
data_grub = {
|
||||
'windows_entry': get_windows_entry_in_grub(connection)
|
||||
}
|
||||
|
||||
data_audio = {
|
||||
'speakers': get_audio_config(connection).get('sinks'),
|
||||
'microphones': get_audio_config(connection).get('sources')
|
||||
}
|
||||
|
||||
data['os'] = data_os
|
||||
data['ssh'] = data_ssh
|
||||
data['grub'] = data_grub
|
||||
data['audio'] = data_audio
|
||||
data['monitors'] = get_monitors_config(connection)
|
||||
data['bluetooth_devices'] = get_bluetooth_devices(connection)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def get_bluetooth_devices(connection: Connection, only_connected: bool = False, return_as_string: bool = False) -> []:
|
||||
commands = {
|
||||
"unix": "bluetoothctl info",
|
||||
"windows": "",
|
||||
}
|
||||
|
||||
if is_unix_system(connection):
|
||||
result = connection.run(commands["unix"])
|
||||
if result.return_code != 0:
|
||||
if result.stdout.__contains__("Missing device address argument"): # Means no devices are connected
|
||||
return []
|
||||
else:
|
||||
_LOGGER.warning(f"Cannot retrieve bluetooth devices at {connection.host}. "
|
||||
f"Make sure bluetoothctl is installed!")
|
||||
return []
|
||||
|
||||
devices = []
|
||||
current_device = None
|
||||
|
||||
for line in result.stdout.split('\n'):
|
||||
if line.startswith('Device'):
|
||||
if current_device is not None:
|
||||
devices.append({
|
||||
"address": current_device,
|
||||
"name": current_name,
|
||||
"connected": current_connected
|
||||
})
|
||||
current_device = line.split()[1]
|
||||
current_name = None
|
||||
current_connected = None
|
||||
elif 'Name:' in line:
|
||||
current_name = line.split(': ', 1)[1]
|
||||
elif 'Connected:' in line:
|
||||
current_connected = line.split(': ')[1] == 'yes'
|
||||
|
||||
# Add the last device if any
|
||||
if current_device is not None:
|
||||
devices.append({
|
||||
"address": current_device,
|
||||
"name": current_name,
|
||||
"connected": current_connected
|
||||
})
|
||||
|
||||
if only_connected:
|
||||
devices = [device for device in devices if device["connected"] == "yes"]
|
||||
|
||||
if return_as_string:
|
||||
devices = "; ".join([f"{device['name']} ({device['address']})" for device in devices])
|
||||
|
||||
return devices
|
||||
else:
|
||||
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
@ -1,5 +1,4 @@
|
||||
fabric2~=3.2.2
|
||||
paramiko~=3.4.0
|
||||
voluptuous~=0.14.2
|
||||
wakeonlan~=3.1.0
|
||||
homeassistant~=2024.3.1
|
||||
homeassistant
|
||||
paramiko~=3.5.0
|
||||
voluptuous~=0.15.2
|
Loading…
Reference in New Issue
Block a user