Compare commits
No commits in common. "main" and "1.4.1" have entirely different histories.
22
.github/wiki/script-auto-config-linux.sh
vendored
22
.github/wiki/script-auto-config-linux.sh
vendored
@ -32,7 +32,7 @@ fi
|
|||||||
# Configure sudoers
|
# Configure sudoers
|
||||||
print_colored "$COLOR_BLUE" "Configuring sudoers..."
|
print_colored "$COLOR_BLUE" "Configuring sudoers..."
|
||||||
echo -e "\n# Allow your user to execute specific commands without a password (for EasyComputerManager/HA)" | sudo tee -a /etc/sudoers > /dev/null
|
echo -e "\n# Allow your user to execute specific commands without a password (for EasyComputerManager/HA)" | sudo tee -a /etc/sudoers > /dev/null
|
||||||
echo "$USER_BEHIND_SUDO ALL=(ALL) NOPASSWD: /sbin/shutdown, /sbin/init, /usr/sbin/pm-suspend, /usr/sbin/grub-reboot, /usr/sbin/grub2-reboot, /usr/bin/cat /etc/grub2.cfg, /usr/bin/cat /etc/grub.cfg, /usr/bin/systemctl poweroff, /usr/bin/systemctl reboot, /usr/bin/systemctl suspend" | sudo tee -a /etc/sudoers > /dev/null
|
echo "$USER_BEHIND_SUDO ALL=(ALL) NOPASSWD: /sbin/shutdown, /sbin/init, /usr/bin/systemctl, /usr/sbin/pm-suspend, /usr/bin/awk, /usr/sbin/grub-reboot, /usr/sbin/grub2-reboot" | sudo tee -a /etc/sudoers > /dev/null
|
||||||
print_colored "$COLOR_GREEN" "Sudoers file configured successfully."
|
print_colored "$COLOR_GREEN" "Sudoers file configured successfully."
|
||||||
|
|
||||||
# Firewall Configuration
|
# Firewall Configuration
|
||||||
@ -51,16 +51,16 @@ DESKTOP_ENTRY_NAME="EasyComputerManager-AutoStart"
|
|||||||
DESKTOP_ENTRY_PATH="/home/$USER_BEHIND_SUDO/.config/autostart/$DESKTOP_ENTRY_NAME.desktop"
|
DESKTOP_ENTRY_PATH="/home/$USER_BEHIND_SUDO/.config/autostart/$DESKTOP_ENTRY_NAME.desktop"
|
||||||
|
|
||||||
# Create the desktop entry file for the Desktop Environment to autostart at login every reboot
|
# Create the desktop entry file for the Desktop Environment to autostart at login every reboot
|
||||||
cat > "$DESKTOP_ENTRY_PATH" <<EOF
|
# cat > "$DESKTOP_ENTRY_PATH" <<EOF
|
||||||
[Desktop Entry]
|
# [Desktop Entry]
|
||||||
Type=Application
|
# Type=Application
|
||||||
Name=$DESKTOP_ENTRY_NAME
|
# Name=$DESKTOP_ENTRY_NAME
|
||||||
Exec=sh -c '$COMMANDS'
|
# Exec=sh -c '$COMMANDS'
|
||||||
Hidden=false
|
# Hidden=false
|
||||||
NoDisplay=false
|
# NoDisplay=false
|
||||||
X-GNOME-Autostart-enabled=true
|
# X-GNOME-Autostart-enabled=true
|
||||||
EOF
|
# EOF
|
||||||
chmod +x "$DESKTOP_ENTRY_PATH"
|
# chmod +x "$DESKTOP_ENTRY_PATH"
|
||||||
print_colored "$COLOR_GREEN" "Desktop entry created at $DESKTOP_ENTRY_PATH."
|
print_colored "$COLOR_GREEN" "Desktop entry created at $DESKTOP_ENTRY_PATH."
|
||||||
|
|
||||||
print_colored "$COLOR_GREEN" "\nDone! Some features may require a reboot to work including:"
|
print_colored "$COLOR_GREEN" "\nDone! Some features may require a reboot to work including:"
|
||||||
|
24
README.md
24
README.md
@ -28,11 +28,7 @@ if dual-boot), adjusting audio configurations, changing monitor settings, and mo
|
|||||||
1. Install [HACS](https://hacs.xyz/) if not already installed.
|
1. Install [HACS](https://hacs.xyz/) if not already installed.
|
||||||
2. In Home Assistant, go to "HACS" in the sidebar.
|
2. In Home Assistant, go to "HACS" in the sidebar.
|
||||||
3. Click on "Integrations."
|
3. Click on "Integrations."
|
||||||
4. Click on the three dots in the top right corner and select "Custom repositories."
|
4. Search for "easy_computer_manager" and click "Install."
|
||||||
5. Paste the following URL in the "Repo" field: https://github.com/M4TH1EU/HA-EasyComputerManager
|
|
||||||
6. Select "Integration" from the "Category" dropdown.
|
|
||||||
7. Click "Add."
|
|
||||||
8. Search for "easy_computer_manager" and click "Install."
|
|
||||||
|
|
||||||
### Manually
|
### Manually
|
||||||
|
|
||||||
@ -41,34 +37,22 @@ if dual-boot), adjusting audio configurations, changing monitor settings, and mo
|
|||||||
3. Copy the "custom_components/easy_computer_manager" directory to the "config/custom_components/" directory in your
|
3. Copy the "custom_components/easy_computer_manager" directory to the "config/custom_components/" directory in your
|
||||||
Home Assistant instance.
|
Home Assistant instance.
|
||||||
|
|
||||||
# Preparing your computer (required)
|
## Configuration
|
||||||
> [!CAUTION]
|
|
||||||
> Before adding your computer to Home Assistant, ensure that it is properly configured.
|
|
||||||
|
|
||||||
**See wiki page [here](https://github.com/M4TH1EU/HA-EasyComputerManager/wiki/Prepare-your-computer).**
|
|
||||||
|
|
||||||
## Add Computer to Home Assistant
|
|
||||||
|
|
||||||
1. In Home Assistant, go to "Configuration" in the sidebar.
|
1. In Home Assistant, go to "Configuration" in the sidebar.
|
||||||
2. Select "Integrations."
|
2. Select "Integrations."
|
||||||
3. Click the "+" button to add a new integration.
|
3. Click the "+" button to add a new integration.
|
||||||
4. Search for "Easy Computer Manager" and select it from the list.
|
4. Search for "Easy Computer Manager" and select it from the list.
|
||||||
5. Follow the on-screen instructions to configure the integration, providing details such as the IP address, mac-adress, username,
|
5. Follow the on-screen instructions to configure the integration, providing details such as the IP address, username,
|
||||||
and password for the computer you want to add.
|
and password for the computer you want to manage.
|
||||||
6. Once configured, click "Finish" to add the computer to Home Assistant.
|
6. Once configured, click "Finish" to add the computer to Home Assistant.
|
||||||
|
|
||||||
> [!NOTE]
|
|
||||||
> If you are managing a dual-boot computer, ensure that the "Dual boot system" checkbox is enabled during the configuration.
|
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
After adding your computer to Home Assistant, you can use the provided services to manage it remotely. Explore the
|
After adding your computer to Home Assistant, you can use the provided services to manage it remotely. Explore the
|
||||||
available services in the Home Assistant "Services" tab or use automations to integrate Easy Computer Manager into your
|
available services in the Home Assistant "Services" tab or use automations to integrate Easy Computer Manager into your
|
||||||
smart home setup.
|
smart home setup.
|
||||||
|
|
||||||
## Services
|
|
||||||
A detailed list of available services and their parameters can be found in the wiki [here](https://github.com/M4TH1EU/HA-EasyComputerManager/wiki/Services).
|
|
||||||
|
|
||||||
## Troubleshooting
|
## Troubleshooting
|
||||||
|
|
||||||
If you encounter any issues during installation or configuration, refer to the troubleshooting section in
|
If you encounter any issues during installation or configuration, refer to the troubleshooting section in
|
||||||
|
0
custom_components/__init__.py
Normal file
0
custom_components/__init__.py
Normal file
@ -16,7 +16,7 @@ from homeassistant.core import HomeAssistant, ServiceCall
|
|||||||
|
|
||||||
from .const import DOMAIN, SERVICE_SEND_MAGIC_PACKET, SERVICE_CHANGE_MONITORS_CONFIG
|
from .const import DOMAIN, SERVICE_SEND_MAGIC_PACKET, SERVICE_CHANGE_MONITORS_CONFIG
|
||||||
|
|
||||||
LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA = vol.Schema({
|
WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA = vol.Schema({
|
||||||
vol.Required(CONF_MAC): cv.string,
|
vol.Required(CONF_MAC): cv.string,
|
||||||
@ -40,7 +40,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||||||
if broadcast_port is not None:
|
if broadcast_port is not None:
|
||||||
service_kwargs["port"] = broadcast_port
|
service_kwargs["port"] = broadcast_port
|
||||||
|
|
||||||
LOGGER.info(
|
_LOGGER.info(
|
||||||
"Sending magic packet to MAC %s (broadcast: %s, port: %s)",
|
"Sending magic packet to MAC %s (broadcast: %s, port: %s)",
|
||||||
mac_address,
|
mac_address,
|
||||||
broadcast_address,
|
broadcast_address,
|
||||||
@ -59,7 +59,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||||||
schema=WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA,
|
schema=WAKE_ON_LAN_SEND_MAGIC_PACKET_SCHEMA,
|
||||||
)
|
)
|
||||||
|
|
||||||
await hass.config_entries.async_forward_entry_setups(entry, ["switch"])
|
hass.async_create_task(
|
||||||
|
hass.config_entries.async_forward_entry_setup(
|
||||||
|
entry, "switch"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -1,208 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
from typing import Optional, Dict, Any
|
|
||||||
|
|
||||||
from wakeonlan import send_magic_packet
|
|
||||||
|
|
||||||
from custom_components.easy_computer_manager import const, LOGGER
|
|
||||||
from custom_components.easy_computer_manager.computer.common import OSType, CommandOutput
|
|
||||||
from custom_components.easy_computer_manager.computer.formatter import format_gnome_monitors_args, format_pactl_commands
|
|
||||||
from custom_components.easy_computer_manager.computer.parser import parse_gnome_monitors_output, parse_pactl_output, \
|
|
||||||
parse_bluetoothctl
|
|
||||||
from custom_components.easy_computer_manager.computer.ssh_client import SSHClient
|
|
||||||
|
|
||||||
|
|
||||||
class Computer:
|
|
||||||
def __init__(self, host: str, mac: str, username: str, password: str, port: int = 22,
|
|
||||||
dualboot: bool = False) -> None:
|
|
||||||
"""Initialize the Computer object."""
|
|
||||||
self.initialized = False # used to avoid duplicated ssh connections
|
|
||||||
|
|
||||||
self.host = host
|
|
||||||
self.mac = mac
|
|
||||||
self.username = username
|
|
||||||
self._password = password
|
|
||||||
self.port = port
|
|
||||||
self.dualboot = dualboot
|
|
||||||
|
|
||||||
self.operating_system: Optional[OSType] = None
|
|
||||||
self.operating_system_version: Optional[str] = None
|
|
||||||
self.desktop_environment: Optional[str] = None
|
|
||||||
self.windows_entry_grub: Optional[str] = None
|
|
||||||
self.monitors_config: Optional[Dict[str, Any]] = None
|
|
||||||
self.audio_config: Dict[str, Optional[Dict]] = {}
|
|
||||||
self.bluetooth_devices: Dict[str, Any] = {}
|
|
||||||
|
|
||||||
self._connection: SSHClient = SSHClient(host, username, password, port)
|
|
||||||
asyncio.create_task(self._connection.connect(computer=self))
|
|
||||||
|
|
||||||
async def update(self, state: Optional[bool] = True, timeout: Optional[int] = 2) -> None:
|
|
||||||
"""Update computer details."""
|
|
||||||
if not state or not await self.is_on():
|
|
||||||
LOGGER.debug("Computer is off, skipping update")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Ensure connection is established before updating
|
|
||||||
await self._ensure_connection_alive(timeout)
|
|
||||||
|
|
||||||
# Update tasks
|
|
||||||
await asyncio.gather(
|
|
||||||
self._update_operating_system(),
|
|
||||||
self._update_operating_system_version(),
|
|
||||||
self._update_desktop_environment(),
|
|
||||||
self._update_windows_entry_grub(),
|
|
||||||
self._update_monitors_config(),
|
|
||||||
self._update_audio_config(),
|
|
||||||
self._update_bluetooth_devices()
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _ensure_connection_alive(self, timeout: int) -> None:
|
|
||||||
"""Ensure the SSH connection is alive, reconnect if necessary."""
|
|
||||||
for _ in range(timeout * 4):
|
|
||||||
if self._connection.is_connection_alive():
|
|
||||||
return
|
|
||||||
await asyncio.sleep(0.25)
|
|
||||||
|
|
||||||
if not self._connection.is_connection_alive():
|
|
||||||
LOGGER.debug(f"Reconnecting to {self.host}")
|
|
||||||
await self._connection.connect()
|
|
||||||
if not self._connection.is_connection_alive():
|
|
||||||
LOGGER.debug(f"Failed to connect to {self.host} after timeout={timeout}s")
|
|
||||||
raise ConnectionError("SSH connection could not be re-established")
|
|
||||||
|
|
||||||
async def _update_operating_system(self) -> None:
|
|
||||||
"""Update the operating system information."""
|
|
||||||
self.operating_system = await self._detect_operating_system()
|
|
||||||
|
|
||||||
async def _update_operating_system_version(self) -> None:
|
|
||||||
"""Update the operating system version."""
|
|
||||||
self.operating_system_version = (await self.run_action("operating_system_version")).output
|
|
||||||
|
|
||||||
async def _update_desktop_environment(self) -> None:
|
|
||||||
"""Update the desktop environment information."""
|
|
||||||
self.desktop_environment = (await self.run_action("desktop_environment")).output.lower()
|
|
||||||
|
|
||||||
async def _update_windows_entry_grub(self) -> None:
|
|
||||||
"""Update Windows entry in GRUB (if applicable)."""
|
|
||||||
self.windows_entry_grub = (await self.run_action("get_windows_entry_grub")).output
|
|
||||||
|
|
||||||
async def _update_monitors_config(self) -> None:
|
|
||||||
"""Update monitors configuration."""
|
|
||||||
if self.operating_system == OSType.LINUX:
|
|
||||||
output = (await self.run_action("get_monitors_config")).output
|
|
||||||
self.monitors_config = parse_gnome_monitors_output(output)
|
|
||||||
# TODO: Implement for Windows if needed
|
|
||||||
|
|
||||||
async def _update_audio_config(self) -> None:
|
|
||||||
"""Update audio configuration."""
|
|
||||||
speakers_output = (await self.run_action("get_speakers")).output
|
|
||||||
microphones_output = (await self.run_action("get_microphones")).output
|
|
||||||
|
|
||||||
if self.operating_system == OSType.LINUX:
|
|
||||||
self.audio_config = parse_pactl_output(speakers_output, microphones_output)
|
|
||||||
# TODO: Implement for Windows
|
|
||||||
|
|
||||||
async def _update_bluetooth_devices(self) -> None:
|
|
||||||
"""Update Bluetooth devices list."""
|
|
||||||
if self.operating_system == OSType.LINUX:
|
|
||||||
self.bluetooth_devices = parse_bluetoothctl(await self.run_action("get_bluetooth_devices"))
|
|
||||||
# TODO: Implement for Windows
|
|
||||||
|
|
||||||
async def _detect_operating_system(self) -> OSType:
|
|
||||||
"""Detect the operating system by running a uname command."""
|
|
||||||
result = await self.run_manually("uname")
|
|
||||||
return OSType.LINUX if result.successful() else OSType.WINDOWS
|
|
||||||
|
|
||||||
async def is_on(self, timeout: int = 1) -> bool:
|
|
||||||
"""Check if the computer is on by pinging it."""
|
|
||||||
ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self.host)]
|
|
||||||
proc = await asyncio.create_subprocess_exec(
|
|
||||||
*ping_cmd,
|
|
||||||
stdout=asyncio.subprocess.DEVNULL,
|
|
||||||
stderr=asyncio.subprocess.DEVNULL
|
|
||||||
)
|
|
||||||
await proc.communicate()
|
|
||||||
return proc.returncode == 0
|
|
||||||
|
|
||||||
async def start(self) -> None:
|
|
||||||
"""Start the computer using Wake-on-LAN."""
|
|
||||||
send_magic_packet(self.mac)
|
|
||||||
|
|
||||||
async def shutdown(self) -> None:
|
|
||||||
"""Shutdown the computer."""
|
|
||||||
await self.run_action("shutdown")
|
|
||||||
|
|
||||||
async def restart(self, from_os: Optional[OSType] = None, to_os: Optional[OSType] = None) -> None:
|
|
||||||
"""Restart the computer."""
|
|
||||||
await self.run_action("restart")
|
|
||||||
|
|
||||||
async def put_to_sleep(self) -> None:
|
|
||||||
"""Put the computer to sleep."""
|
|
||||||
await self.run_action("sleep")
|
|
||||||
|
|
||||||
async def set_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
|
|
||||||
"""Set monitors configuration."""
|
|
||||||
if self.is_linux() and self.desktop_environment == 'gnome':
|
|
||||||
await self.run_action("set_monitors_config", params={"args": format_gnome_monitors_args(monitors_config)})
|
|
||||||
|
|
||||||
async def set_audio_config(self, volume: Optional[int] = None, mute: Optional[bool] = None,
|
|
||||||
input_device: Optional[str] = None, output_device: Optional[str] = None) -> None:
|
|
||||||
"""Set audio configuration."""
|
|
||||||
if self.is_linux() and self.desktop_environment == 'gnome':
|
|
||||||
pactl_commands = format_pactl_commands(self.audio_config, volume, mute, input_device, output_device)
|
|
||||||
for command in pactl_commands:
|
|
||||||
await self.run_action("set_audio_config", params={"args": command})
|
|
||||||
|
|
||||||
async def install_nircmd(self) -> None:
|
|
||||||
"""Install NirCmd tool (Windows specific)."""
|
|
||||||
install_path = f"C:\\Users\\{self.username}\\AppData\\Local\\EasyComputerManager"
|
|
||||||
await self.run_action("install_nircmd", params={
|
|
||||||
"download_url": "https://www.nirsoft.net/utils/nircmd.zip",
|
|
||||||
"install_path": install_path
|
|
||||||
})
|
|
||||||
|
|
||||||
async def steam_big_picture(self, action: str) -> None:
|
|
||||||
"""Start, stop, or exit Steam Big Picture mode."""
|
|
||||||
await self.run_action(f"{action}_steam_big_picture")
|
|
||||||
|
|
||||||
async def run_action(self, id: str, params: Optional[Dict[str, Any]] = None,
|
|
||||||
raise_on_error: bool = False) -> CommandOutput:
|
|
||||||
"""Run a predefined action via SSH."""
|
|
||||||
params = params or {}
|
|
||||||
|
|
||||||
action = const.ACTIONS.get(id)
|
|
||||||
if not action:
|
|
||||||
LOGGER.error(f"Action {id} not found.")
|
|
||||||
return CommandOutput("", 1, "", "Action not found")
|
|
||||||
|
|
||||||
if not self.operating_system:
|
|
||||||
self.operating_system = await self._detect_operating_system()
|
|
||||||
|
|
||||||
os_commands = action.get(self.operating_system.lower())
|
|
||||||
if not os_commands:
|
|
||||||
raise ValueError(f"Action {id} not supported for OS: {self.operating_system}")
|
|
||||||
|
|
||||||
commands = os_commands if isinstance(os_commands, list) else os_commands.get("commands",
|
|
||||||
[os_commands.get("command")])
|
|
||||||
required_params = []
|
|
||||||
if "params" in os_commands:
|
|
||||||
required_params = os_commands.get("params", [])
|
|
||||||
|
|
||||||
# Validate parameters
|
|
||||||
if sorted(required_params) != sorted(params.keys()):
|
|
||||||
raise ValueError(f"Invalid/missing parameters for action: {id}")
|
|
||||||
|
|
||||||
for command in commands:
|
|
||||||
for param, value in params.items():
|
|
||||||
command = command.replace(f"%{param}%", str(value))
|
|
||||||
|
|
||||||
result = await self.run_manually(command)
|
|
||||||
if result.successful():
|
|
||||||
return result
|
|
||||||
elif raise_on_error:
|
|
||||||
raise ValueError(f"Command failed: {command}")
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
async def run_manually(self, command: str) -> CommandOutput:
|
|
||||||
"""Run a custom command manually via SSH."""
|
|
||||||
return await self._connection.execute_command(command)
|
|
@ -1,18 +0,0 @@
|
|||||||
from enum import Enum
|
|
||||||
|
|
||||||
|
|
||||||
class OSType(str, Enum):
|
|
||||||
WINDOWS = "Windows"
|
|
||||||
LINUX = "Linux"
|
|
||||||
MACOS = "MacOS"
|
|
||||||
|
|
||||||
|
|
||||||
class CommandOutput:
|
|
||||||
def __init__(self, command: str, return_code: int, output: str, error: str) -> None:
|
|
||||||
self.command = command
|
|
||||||
self.return_code = return_code
|
|
||||||
self.output = output.strip()
|
|
||||||
self.error = error.strip()
|
|
||||||
|
|
||||||
def successful(self) -> bool:
|
|
||||||
return self.return_code == 0
|
|
@ -1,62 +0,0 @@
|
|||||||
def format_gnome_monitors_args(monitors_config: dict):
|
|
||||||
args = []
|
|
||||||
|
|
||||||
monitors_config = monitors_config.get('monitors_config', {})
|
|
||||||
|
|
||||||
for monitor, settings in monitors_config.items():
|
|
||||||
if settings.get('enabled', False):
|
|
||||||
args.extend(['-LpM' if settings.get('primary', False) else '-LM', monitor])
|
|
||||||
|
|
||||||
if 'position' in settings:
|
|
||||||
args.extend(['-x', str(settings["position"][0]), '-y', str(settings["position"][1])])
|
|
||||||
|
|
||||||
if 'mode' in settings:
|
|
||||||
args.extend(['-m', settings["mode"]])
|
|
||||||
|
|
||||||
if 'scale' in settings:
|
|
||||||
args.extend(['-s', str(settings["scale"])])
|
|
||||||
|
|
||||||
if 'transform' in settings:
|
|
||||||
args.extend(['-t', settings["transform"]])
|
|
||||||
|
|
||||||
return ' '.join(args)
|
|
||||||
|
|
||||||
|
|
||||||
def format_pactl_commands(current_config: {}, volume: int, mute: bool, input_device: str = "@DEFAULT_SOURCE@",
|
|
||||||
output_device: str = "@DEFAULT_SINK@"):
|
|
||||||
"""Change audio configuration on the host system."""
|
|
||||||
|
|
||||||
commands = []
|
|
||||||
|
|
||||||
def get_device_id(device_type, user_device):
|
|
||||||
for device in current_config[device_type]:
|
|
||||||
if device['description'] == user_device:
|
|
||||||
return device['name']
|
|
||||||
return user_device
|
|
||||||
|
|
||||||
# Set default sink and source if not specified
|
|
||||||
if not output_device:
|
|
||||||
output_device = "@DEFAULT_SINK@"
|
|
||||||
if not input_device:
|
|
||||||
input_device = "@DEFAULT_SOURCE@"
|
|
||||||
|
|
||||||
# Set default sink if specified
|
|
||||||
if output_device and output_device != "@DEFAULT_SINK@":
|
|
||||||
output_device = get_device_id('sinks', output_device)
|
|
||||||
commands.append(f"set-default-sink {output_device}")
|
|
||||||
|
|
||||||
# Set default source if specified
|
|
||||||
if input_device and input_device != "@DEFAULT_SOURCE@":
|
|
||||||
input_device = get_device_id('sources', input_device)
|
|
||||||
commands.append(f"set-default-source {input_device}")
|
|
||||||
|
|
||||||
# Set sink volume if specified
|
|
||||||
if volume is not None:
|
|
||||||
commands.append(f"set-sink-volume {output_device} {volume}%")
|
|
||||||
|
|
||||||
# Set sink and source mute status if specified
|
|
||||||
if mute is not None:
|
|
||||||
commands.append(f"set-sink-mute {output_device} {'yes' if mute else 'no'}")
|
|
||||||
commands.append(f"set-source-mute {input_device} {'yes' if mute else 'no'}")
|
|
||||||
|
|
||||||
return commands
|
|
@ -1,168 +0,0 @@
|
|||||||
import re
|
|
||||||
|
|
||||||
from custom_components.easy_computer_manager import LOGGER
|
|
||||||
from custom_components.easy_computer_manager.computer import CommandOutput
|
|
||||||
|
|
||||||
|
|
||||||
def parse_gnome_monitors_output(config: str) -> list:
|
|
||||||
"""
|
|
||||||
Parse the GNOME monitors configuration.
|
|
||||||
|
|
||||||
:param config:
|
|
||||||
The output of the gnome-monitor-config list command.
|
|
||||||
|
|
||||||
:type config: str
|
|
||||||
|
|
||||||
:returns: list
|
|
||||||
The parsed monitors configuration.
|
|
||||||
"""
|
|
||||||
|
|
||||||
monitors = []
|
|
||||||
current_monitor = None
|
|
||||||
|
|
||||||
for line in config.split('\n'):
|
|
||||||
monitor_match = re.match(r'^Monitor \[ (.+?) \] (ON|OFF)$', line)
|
|
||||||
if monitor_match:
|
|
||||||
if current_monitor:
|
|
||||||
monitors.append(current_monitor)
|
|
||||||
source, status = monitor_match.groups()
|
|
||||||
current_monitor = {'source': source, 'status': status, 'names': [], 'resolutions': []}
|
|
||||||
elif current_monitor:
|
|
||||||
display_name_match = re.match(r'^\s+display-name: (.+)$', line)
|
|
||||||
resolution_match = re.match(r'^\s+(\d+x\d+@\d+(?:\.\d+)?).*$', line)
|
|
||||||
if display_name_match:
|
|
||||||
current_monitor['names'].append(display_name_match.group(1).replace('"', ''))
|
|
||||||
elif resolution_match:
|
|
||||||
# Don't include resolutions under 1280x720
|
|
||||||
if int(resolution_match.group(1).split('@')[0].split('x')[0]) >= 1280:
|
|
||||||
|
|
||||||
# If there are already resolutions in the list, check if the framerate between the last is >1
|
|
||||||
if len(current_monitor['resolutions']) > 0:
|
|
||||||
last_resolution = current_monitor['resolutions'][-1]
|
|
||||||
last_resolution_size = last_resolution.split('@')[0]
|
|
||||||
this_resolution_size = resolution_match.group(1).split('@')[0]
|
|
||||||
|
|
||||||
# Only truncate some framerates if the resolution are the same
|
|
||||||
if last_resolution_size == this_resolution_size:
|
|
||||||
last_resolution_framerate = float(last_resolution.split('@')[1])
|
|
||||||
this_resolution_framerate = float(resolution_match.group(1).split('@')[1])
|
|
||||||
|
|
||||||
# If the difference between the last resolution framerate and this one is >1, ignore it
|
|
||||||
if last_resolution_framerate - 1 > this_resolution_framerate:
|
|
||||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
|
||||||
else:
|
|
||||||
# If the resolution is different, this adds the new resolution
|
|
||||||
# to the list without truncating
|
|
||||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
|
||||||
else:
|
|
||||||
# This is the first resolution, add it to the list
|
|
||||||
current_monitor['resolutions'].append(resolution_match.group(1))
|
|
||||||
|
|
||||||
if current_monitor:
|
|
||||||
monitors.append(current_monitor)
|
|
||||||
|
|
||||||
return monitors
|
|
||||||
|
|
||||||
|
|
||||||
def parse_pactl_output(config_speakers: str, config_microphones: str) -> dict[str, list]:
|
|
||||||
"""
|
|
||||||
Parse the pactl audio configuration.
|
|
||||||
|
|
||||||
:param config_speakers:
|
|
||||||
The output of the pactl list sinks command.
|
|
||||||
:param config_microphones:
|
|
||||||
The output of the pactl list sources command.
|
|
||||||
|
|
||||||
:type config_speakers: str
|
|
||||||
:type config_microphones: str
|
|
||||||
|
|
||||||
:returns: dict
|
|
||||||
The parsed audio configuration.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
config = {'speakers': [], 'microphones': []}
|
|
||||||
|
|
||||||
def parse_device_info(lines, device_type):
|
|
||||||
devices = []
|
|
||||||
current_device = {}
|
|
||||||
|
|
||||||
for line in lines:
|
|
||||||
if line.startswith(f"{device_type} #"):
|
|
||||||
if current_device and "Monitor" not in current_device['description']:
|
|
||||||
devices.append(current_device)
|
|
||||||
current_device = {'id': int(re.search(r'#(\d+)', line).group(1))}
|
|
||||||
elif line.startswith(" Name:"):
|
|
||||||
current_device['name'] = line.split(":")[1].strip()
|
|
||||||
elif line.startswith(" State:"):
|
|
||||||
current_device['state'] = line.split(":")[1].strip()
|
|
||||||
elif line.startswith(" Description:"):
|
|
||||||
current_device['description'] = line.split(":")[1].strip()
|
|
||||||
|
|
||||||
if current_device:
|
|
||||||
devices.append(current_device)
|
|
||||||
|
|
||||||
return devices
|
|
||||||
|
|
||||||
config['speakers'] = parse_device_info(config_speakers.split('\n'), 'Sink')
|
|
||||||
config['microphones'] = parse_device_info(config_microphones.split('\n'), 'Source')
|
|
||||||
|
|
||||||
return config
|
|
||||||
|
|
||||||
|
|
||||||
def parse_bluetoothctl(command: CommandOutput, connected_devices_only: bool = True,
|
|
||||||
return_as_string: bool = False) -> list | str:
|
|
||||||
"""Parse the bluetoothctl info command.
|
|
||||||
|
|
||||||
:param command:
|
|
||||||
The command output.
|
|
||||||
:param connected_devices_only:
|
|
||||||
Only return connected devices.
|
|
||||||
Will return all devices, connected or not, if False.
|
|
||||||
|
|
||||||
:type command: :class: CommandOutput
|
|
||||||
:type connected_devices_only: bool
|
|
||||||
|
|
||||||
:returns: str | list
|
|
||||||
The parsed bluetooth devices.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not command.successful():
|
|
||||||
if command.output.__contains__("Missing device address argument"): # Means no devices are connected
|
|
||||||
return "" if return_as_string else []
|
|
||||||
else:
|
|
||||||
LOGGER.warning(f"Cannot retrieve bluetooth devices, make sure bluetoothctl is installed")
|
|
||||||
return "" if return_as_string else []
|
|
||||||
|
|
||||||
devices = []
|
|
||||||
current_device = None
|
|
||||||
|
|
||||||
for line in command.output.split('\n'):
|
|
||||||
if line.startswith('Device'):
|
|
||||||
if current_device is not None:
|
|
||||||
devices.append({
|
|
||||||
"address": current_device,
|
|
||||||
"name": current_name,
|
|
||||||
"connected": current_connected
|
|
||||||
})
|
|
||||||
current_device = line.split()[1]
|
|
||||||
current_name = None
|
|
||||||
current_connected = None
|
|
||||||
elif 'Name:' in line:
|
|
||||||
current_name = line.split(': ', 1)[1]
|
|
||||||
elif 'Connected:' in line:
|
|
||||||
current_connected = line.split(': ')[1] == 'yes'
|
|
||||||
|
|
||||||
# Add the last device if any
|
|
||||||
if current_device is not None:
|
|
||||||
devices.append({
|
|
||||||
"address": current_device,
|
|
||||||
"name": current_name,
|
|
||||||
"connected": current_connected
|
|
||||||
})
|
|
||||||
|
|
||||||
if connected_devices_only:
|
|
||||||
devices = [device for device in devices if device["connected"] == True]
|
|
||||||
|
|
||||||
return devices
|
|
@ -1,104 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
import paramiko
|
|
||||||
|
|
||||||
from custom_components.easy_computer_manager import LOGGER
|
|
||||||
from custom_components.easy_computer_manager.computer import CommandOutput
|
|
||||||
|
|
||||||
|
|
||||||
class SSHClient:
|
|
||||||
def __init__(self, host: str, username: str, password: Optional[str] = None, port: int = 22):
|
|
||||||
self.host = host
|
|
||||||
self.username = username
|
|
||||||
self._password = password
|
|
||||||
self.port = port
|
|
||||||
self._connection: Optional[paramiko.SSHClient] = None
|
|
||||||
|
|
||||||
async def __aenter__(self):
|
|
||||||
await self.connect()
|
|
||||||
return self
|
|
||||||
|
|
||||||
async def __aexit__(self, exc_type, exc_value, traceback):
|
|
||||||
self.disconnect()
|
|
||||||
|
|
||||||
async def connect(self, retried: bool = False, computer: Optional['Computer'] = None) -> None:
|
|
||||||
"""Open an SSH connection using Paramiko asynchronously."""
|
|
||||||
if self.is_connection_alive():
|
|
||||||
LOGGER.debug(f"Connection to {self.host} is already active.")
|
|
||||||
return
|
|
||||||
|
|
||||||
self.disconnect() # Ensure any previous connection is closed
|
|
||||||
|
|
||||||
loop = asyncio.get_running_loop()
|
|
||||||
client = paramiko.SSHClient()
|
|
||||||
|
|
||||||
# Set missing host key policy to automatically accept unknown host keys
|
|
||||||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Offload the blocking connect call to a thread
|
|
||||||
await loop.run_in_executor(None, self._blocking_connect, client)
|
|
||||||
self._connection = client
|
|
||||||
LOGGER.debug(f"Connected to {self.host}")
|
|
||||||
|
|
||||||
except (OSError, paramiko.SSHException) as exc:
|
|
||||||
LOGGER.debug(f"Failed to connect to {self.host}: {exc}")
|
|
||||||
if not retried:
|
|
||||||
LOGGER.debug(f"Retrying connection to {self.host}...")
|
|
||||||
await self.connect(retried=True) # Retry only once
|
|
||||||
|
|
||||||
finally:
|
|
||||||
if computer is not None and hasattr(computer, "initialized"):
|
|
||||||
computer.initialized = True
|
|
||||||
|
|
||||||
def disconnect(self) -> None:
|
|
||||||
"""Close the SSH connection."""
|
|
||||||
if self._connection:
|
|
||||||
self._connection.close()
|
|
||||||
LOGGER.debug(f"Disconnected from {self.host}")
|
|
||||||
self._connection = None
|
|
||||||
|
|
||||||
def _blocking_connect(self, client: paramiko.SSHClient):
|
|
||||||
"""Perform the blocking SSH connection using Paramiko."""
|
|
||||||
client.connect(
|
|
||||||
hostname=self.host,
|
|
||||||
username=self.username,
|
|
||||||
password=self._password,
|
|
||||||
port=self.port,
|
|
||||||
look_for_keys=False, # Set this to True if using private keys
|
|
||||||
allow_agent=False
|
|
||||||
)
|
|
||||||
|
|
||||||
async def execute_command(self, command: str) -> CommandOutput:
|
|
||||||
"""Execute a command on the SSH server asynchronously."""
|
|
||||||
if not self.is_connection_alive():
|
|
||||||
LOGGER.debug(f"Connection to {self.host} is not alive. Reconnecting...")
|
|
||||||
await self.connect()
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Offload command execution to avoid blocking
|
|
||||||
loop = asyncio.get_running_loop()
|
|
||||||
stdin, stdout, stderr = await loop.run_in_executor(None, self._connection.exec_command, command)
|
|
||||||
|
|
||||||
exit_status = stdout.channel.recv_exit_status()
|
|
||||||
return CommandOutput(command, exit_status, stdout.read().decode(), stderr.read().decode())
|
|
||||||
|
|
||||||
except (paramiko.SSHException, EOFError) as exc:
|
|
||||||
LOGGER.error(f"Failed to execute command on {self.host}: {exc}")
|
|
||||||
return CommandOutput(command, -1, "", "")
|
|
||||||
|
|
||||||
def is_connection_alive(self) -> bool:
|
|
||||||
"""Check if the SSH connection is still alive."""
|
|
||||||
if self._connection is None:
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
transport = self._connection.get_transport()
|
|
||||||
transport.send_ignore()
|
|
||||||
|
|
||||||
self._connection.exec_command('ls', timeout=1)
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
return False
|
|
@ -1,40 +0,0 @@
|
|||||||
async def format_debug_information(computer: 'Computer'): # importing Computer causes circular import (how to fix?)
|
|
||||||
"""Return debug information about the host system."""
|
|
||||||
|
|
||||||
data = {
|
|
||||||
'os': {
|
|
||||||
'name': computer.operating_system,
|
|
||||||
'version': computer.operating_system_version,
|
|
||||||
'desktop_environment': computer.desktop_environment
|
|
||||||
},
|
|
||||||
'connection': {
|
|
||||||
'host': computer.host,
|
|
||||||
'mac': computer.mac,
|
|
||||||
'username': computer.username,
|
|
||||||
'port': computer.port,
|
|
||||||
'dualboot': computer.dualboot,
|
|
||||||
'is_on': await computer.is_on(),
|
|
||||||
'is_connected': computer._connection.is_connection_alive()
|
|
||||||
},
|
|
||||||
'grub': {
|
|
||||||
'windows_entry': computer.windows_entry_grub
|
|
||||||
},
|
|
||||||
'audio': {
|
|
||||||
'speakers': computer.audio_config.get('speakers'),
|
|
||||||
'microphones': computer.audio_config.get('microphones')
|
|
||||||
},
|
|
||||||
'monitors': computer.monitors_config,
|
|
||||||
'bluetooth_devices': computer.bluetooth_devices
|
|
||||||
}
|
|
||||||
|
|
||||||
return data
|
|
||||||
|
|
||||||
|
|
||||||
def get_bluetooth_devices_as_str(computer: 'Computer') -> str:
|
|
||||||
"""Return the bluetooth devices as a string."""
|
|
||||||
devices = computer.bluetooth_devices
|
|
||||||
|
|
||||||
if not devices:
|
|
||||||
return ""
|
|
||||||
|
|
||||||
return "; ".join([f"{device['name']} ({device['address']})" for device in devices])
|
|
@ -7,8 +7,9 @@ from typing import Any
|
|||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
from homeassistant import config_entries, exceptions
|
from homeassistant import config_entries, exceptions
|
||||||
from homeassistant.core import HomeAssistant
|
from homeassistant.core import HomeAssistant
|
||||||
|
from paramiko.ssh_exception import AuthenticationException
|
||||||
|
|
||||||
from .computer import Computer
|
from . import utils
|
||||||
from .const import DOMAIN
|
from .const import DOMAIN
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
@ -39,8 +40,6 @@ class Hub:
|
|||||||
self._name = host
|
self._name = host
|
||||||
self._id = host.lower()
|
self._id = host.lower()
|
||||||
|
|
||||||
self.computer = Computer(host, "", username, password, port)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def hub_id(self) -> str:
|
def hub_id(self) -> str:
|
||||||
"""ID for dummy."""
|
"""ID for dummy."""
|
||||||
@ -49,11 +48,9 @@ class Hub:
|
|||||||
async def test_connection(self) -> bool:
|
async def test_connection(self) -> bool:
|
||||||
"""Test connectivity to the computer is OK."""
|
"""Test connectivity to the computer is OK."""
|
||||||
try:
|
try:
|
||||||
# TODO: check if reachable
|
return utils.test_connection(
|
||||||
_LOGGER.info("Testing connection to %s", self._host)
|
utils.create_ssh_connection(self._host, self._username, self._password, self._port))
|
||||||
return True
|
except AuthenticationException:
|
||||||
|
|
||||||
except Exception:
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
@ -66,7 +63,6 @@ async def validate_input(hass: HomeAssistant, data: dict) -> dict[str, Any]:
|
|||||||
|
|
||||||
hub = Hub(hass, data["host"], data["username"], data["password"], data["port"])
|
hub = Hub(hass, data["host"], data["username"], data["password"], data["port"])
|
||||||
|
|
||||||
_LOGGER.info("Validating configuration")
|
|
||||||
if not await hub.test_connection():
|
if not await hub.test_connection():
|
||||||
raise CannotConnect
|
raise CannotConnect
|
||||||
|
|
||||||
@ -85,7 +81,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
|||||||
try:
|
try:
|
||||||
info = await validate_input(self.hass, user_input)
|
info = await validate_input(self.hass, user_input)
|
||||||
return self.async_create_entry(title=info["title"], data=user_input)
|
return self.async_create_entry(title=info["title"], data=user_input)
|
||||||
except (CannotConnect, InvalidHost) as ex:
|
except (AuthenticationException, CannotConnect, InvalidHost) as ex:
|
||||||
errors["base"] = str(ex)
|
errors["base"] = str(ex)
|
||||||
except Exception as ex: # pylint: disable=broad-except
|
except Exception as ex: # pylint: disable=broad-except
|
||||||
_LOGGER.exception("Unexpected exception: %s", ex)
|
_LOGGER.exception("Unexpected exception: %s", ex)
|
||||||
|
@ -11,87 +11,3 @@ SERVICE_CHANGE_MONITORS_CONFIG = "change_monitors_config"
|
|||||||
SERVICE_STEAM_BIG_PICTURE = "steam_big_picture"
|
SERVICE_STEAM_BIG_PICTURE = "steam_big_picture"
|
||||||
SERVICE_CHANGE_AUDIO_CONFIG = "change_audio_config"
|
SERVICE_CHANGE_AUDIO_CONFIG = "change_audio_config"
|
||||||
SERVICE_DEBUG_INFO = "debug_info"
|
SERVICE_DEBUG_INFO = "debug_info"
|
||||||
|
|
||||||
|
|
||||||
ACTIONS = {
|
|
||||||
"operating_system": {
|
|
||||||
"linux": ["uname"]
|
|
||||||
},
|
|
||||||
"operating_system_version": {
|
|
||||||
"windows": ['for /f "tokens=1 delims=|" %i in (\'wmic os get Name ^| findstr /B /C:"Microsoft"\') do @echo %i'],
|
|
||||||
"linux": ["awk -F'=' '/^NAME=|^VERSION=/{gsub(/\"/, \"\", $2); printf $2\" \"}\' /etc/os-release && echo", "lsb_release -a | awk '/Description/ {print $2, $3, $4}'"]
|
|
||||||
},
|
|
||||||
"desktop_environment": {
|
|
||||||
"linux": ["for session in $(ls /usr/bin/*session 2>/dev/null); do basename $session | sed 's/-session//'; done | grep -E 'gnome|kde|xfce|mate|lxde|cinnamon|budgie|unity' | head -n 1"],
|
|
||||||
"windows": ["echo Windows"]
|
|
||||||
},
|
|
||||||
"shutdown": {
|
|
||||||
"windows": ["shutdown /s /t 0", "wmic os where Primary=TRUE call Shutdown"],
|
|
||||||
"linux": ["sudo /sbin/shutdown -h now", "sudo /sbin/init 0", "sudo /usr/bin/systemctl poweroff"]
|
|
||||||
},
|
|
||||||
"restart": {
|
|
||||||
"windows": ["shutdown /r /t 0", "wmic os where Primary=TRUE call Reboot"],
|
|
||||||
"linux": ["sudo /sbin/shutdown -r now", "sudo /sbin/init 6", "sudo /usr/bin/systemctl reboot"]
|
|
||||||
},
|
|
||||||
"sleep": {
|
|
||||||
"windows": ["shutdown /h /t 0", "rundll32.exe powrprof.dll,SetSuspendState Sleep"],
|
|
||||||
"linux": ["sudo /usr/bin/systemctl suspend", "sudo /usr/sbin/pm-suspend"]
|
|
||||||
},
|
|
||||||
"get_windows_entry_grub": {
|
|
||||||
"linux": ["sudo /usr/bin/cat /etc/grub2.cfg | awk -F \"'\" '/windows/ {print $2}'",
|
|
||||||
"sudo /usr/bin/cat /etc/grub.cfg | awk -F \"'\" '/windows/ {print $2}'"]
|
|
||||||
},
|
|
||||||
"set_grub_entry": {
|
|
||||||
"linux": {
|
|
||||||
"commands": ["sudo /usr/sbin/grub-reboot %grub-entry%", "sudo /usr/sbin/grub2-reboot %grub-entry%"],
|
|
||||||
"params": ["grub-entry"],
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"get_monitors_config": {
|
|
||||||
"linux": ["gnome-monitor-config list"]
|
|
||||||
},
|
|
||||||
"set_monitors_config": {
|
|
||||||
"linux": {
|
|
||||||
"gnome": {
|
|
||||||
"command": "gnome-monitor-config set %args%",
|
|
||||||
"params": ["args"]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"get_speakers": {
|
|
||||||
"linux": ["LANG=en_US.UTF-8 pactl list sinks"]
|
|
||||||
},
|
|
||||||
"get_microphones": {
|
|
||||||
"linux": ["LANG=en_US.UTF-8 pactl list sources"]
|
|
||||||
},
|
|
||||||
"set_audio_config": {
|
|
||||||
"linux": {
|
|
||||||
"command": "LANG=en_US.UTF-8 pactl %args%",
|
|
||||||
"params": ["args"]
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"get_bluetooth_devices": {
|
|
||||||
"linux": {
|
|
||||||
"command": "bluetoothctl info",
|
|
||||||
"raise_on_error": False,
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"install_nirmcd": {
|
|
||||||
"windows": {
|
|
||||||
"command": "powershell -Command \"Invoke-WebRequest -Uri %download_url% -OutFile %install_path%\\nircmd.zip -UseBasicParsing; Expand-Archive %install_path%\\nircmd.zip -DestinationPath %install_path%; Remove-Item %install_path%\\nircmd.zip\"",
|
|
||||||
"params": ["download_url", "install_path"]
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"start_steam_big_picture": {
|
|
||||||
"linux": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -bigpicture &",
|
|
||||||
"windows": "start steam://open/bigpicture"
|
|
||||||
},
|
|
||||||
"stop_steam_big_picture": {
|
|
||||||
"linux": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -shutdown &",
|
|
||||||
"windows": "C:\\Program Files (x86)\\Steam\\steam.exe -shutdown"
|
|
||||||
},
|
|
||||||
"exit_steam_big_picture": {
|
|
||||||
"linux": "", # TODO: find a way to exit steam big picture
|
|
||||||
"windows": "nircmd win close title \"Steam Big Picture Mode\""
|
|
||||||
},
|
|
||||||
}
|
|
@ -2,7 +2,8 @@
|
|||||||
"domain": "easy_computer_manager",
|
"domain": "easy_computer_manager",
|
||||||
"name": "Easy Computer Manager",
|
"name": "Easy Computer Manager",
|
||||||
"codeowners": [
|
"codeowners": [
|
||||||
"@M4TH1EU"
|
"@M4TH1EU",
|
||||||
|
"@ntilley905"
|
||||||
],
|
],
|
||||||
"config_flow": true,
|
"config_flow": true,
|
||||||
"dependencies": [],
|
"dependencies": [],
|
||||||
@ -10,8 +11,10 @@
|
|||||||
"iot_class": "local_polling",
|
"iot_class": "local_polling",
|
||||||
"issue_tracker": "https://github.com/M4TH1EU/HA-EasyComputerManager/issues",
|
"issue_tracker": "https://github.com/M4TH1EU/HA-EasyComputerManager/issues",
|
||||||
"requirements": [
|
"requirements": [
|
||||||
|
"construct==2.10.70",
|
||||||
"wakeonlan==3.1.0",
|
"wakeonlan==3.1.0",
|
||||||
"asyncssh==2.16.0"
|
"fabric2==3.2.2",
|
||||||
|
"paramiko==3.4.0"
|
||||||
],
|
],
|
||||||
"version": "2.0.0"
|
"version": "1.3.0"
|
||||||
}
|
}
|
||||||
|
@ -1,53 +1,80 @@
|
|||||||
|
# Some snippets of code are from the official wake_on_lan integration (inspiration for this custom component)
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
from typing import Any, Dict
|
import logging
|
||||||
|
import subprocess as sp
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
from homeassistant.components.switch import SwitchEntity
|
import wakeonlan
|
||||||
|
from homeassistant.components.switch import (SwitchEntity)
|
||||||
from homeassistant.config_entries import ConfigEntry
|
from homeassistant.config_entries import ConfigEntry
|
||||||
from homeassistant.const import (
|
from homeassistant.const import (
|
||||||
CONF_HOST, CONF_MAC, CONF_NAME, CONF_PASSWORD, CONF_PORT, CONF_USERNAME,
|
CONF_HOST,
|
||||||
)
|
CONF_MAC,
|
||||||
|
CONF_NAME,
|
||||||
|
CONF_PASSWORD,
|
||||||
|
CONF_PORT,
|
||||||
|
CONF_USERNAME, )
|
||||||
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse
|
from homeassistant.core import HomeAssistant, ServiceResponse, SupportsResponse
|
||||||
from homeassistant.helpers import entity_platform, device_registry as dr
|
from homeassistant.exceptions import HomeAssistantError
|
||||||
|
from homeassistant.helpers import (
|
||||||
|
device_registry as dr,
|
||||||
|
entity_platform,
|
||||||
|
)
|
||||||
from homeassistant.helpers.config_validation import make_entity_service_schema
|
from homeassistant.helpers.config_validation import make_entity_service_schema
|
||||||
from homeassistant.helpers.device_registry import DeviceInfo
|
from homeassistant.helpers.device_registry import DeviceInfo
|
||||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||||
|
from paramiko.ssh_exception import AuthenticationException
|
||||||
|
|
||||||
from .computer import OSType, Computer
|
from . import utils
|
||||||
from .computer.utils import format_debug_information, get_bluetooth_devices_as_str
|
from .const import SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP, \
|
||||||
from .const import (
|
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER, SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, \
|
||||||
DOMAIN, SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, SERVICE_PUT_COMPUTER_TO_SLEEP,
|
SERVICE_CHANGE_MONITORS_CONFIG, SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO, DOMAIN
|
||||||
SERVICE_START_COMPUTER_TO_WINDOWS, SERVICE_RESTART_COMPUTER,
|
|
||||||
SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, SERVICE_CHANGE_MONITORS_CONFIG,
|
_LOGGER = logging.getLogger(__name__)
|
||||||
SERVICE_STEAM_BIG_PICTURE, SERVICE_CHANGE_AUDIO_CONFIG, SERVICE_DEBUG_INFO
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def async_setup_entry(
|
async def async_setup_entry(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
config: ConfigEntry,
|
config: ConfigEntry,
|
||||||
async_add_entities: AddEntitiesCallback
|
async_add_entities: AddEntitiesCallback,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Set up the computer switch from a config entry."""
|
# Retrieve the data from the config flow
|
||||||
mac_address = config.data[CONF_MAC]
|
mac_address: str = config.data.get(CONF_MAC)
|
||||||
host = config.data[CONF_HOST]
|
# broadcast_address: str | None = config.data.get(CONF_BROADCAST_ADDRESS)
|
||||||
name = config.data[CONF_NAME]
|
# broadcast_port: int | None = config.data.get(CONF_BROADCAST_PORT)
|
||||||
dualboot = config.data.get("dualboot", False)
|
host: str = config.data.get(CONF_HOST)
|
||||||
username = config.data[CONF_USERNAME]
|
name: str = config.data.get(CONF_NAME)
|
||||||
password = config.data[CONF_PASSWORD]
|
dualboot: bool = config.data.get("dualboot")
|
||||||
port = config.data.get(CONF_PORT)
|
username: str = config.data.get(CONF_USERNAME)
|
||||||
|
password: str = config.data.get(CONF_PASSWORD)
|
||||||
|
port: int | None = config.data.get(CONF_PORT)
|
||||||
|
|
||||||
|
# Register the computer switch
|
||||||
async_add_entities(
|
async_add_entities(
|
||||||
[ComputerSwitch(hass, name, host, mac_address, dualboot, username, password, port)],
|
[
|
||||||
True
|
ComputerSwitch(
|
||||||
|
hass,
|
||||||
|
name,
|
||||||
|
host,
|
||||||
|
mac_address,
|
||||||
|
dualboot,
|
||||||
|
username,
|
||||||
|
password,
|
||||||
|
port,
|
||||||
|
),
|
||||||
|
],
|
||||||
|
host is not None,
|
||||||
)
|
)
|
||||||
|
|
||||||
platform = entity_platform.async_get_current_platform()
|
platform = entity_platform.async_get_current_platform()
|
||||||
|
|
||||||
# Service registrations
|
# Synthax : (service_name: str, schema: dict, supports_response: SupportsResponse)
|
||||||
services = [
|
services = [
|
||||||
|
(SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE),
|
||||||
(SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE),
|
(SERVICE_RESTART_TO_WINDOWS_FROM_LINUX, {}, SupportsResponse.NONE),
|
||||||
(SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, {}, SupportsResponse.NONE),
|
(SERVICE_RESTART_TO_LINUX_FROM_WINDOWS, {}, SupportsResponse.NONE),
|
||||||
(SERVICE_PUT_COMPUTER_TO_SLEEP, {}, SupportsResponse.NONE),
|
(SERVICE_PUT_COMPUTER_TO_SLEEP, {}, SupportsResponse.NONE),
|
||||||
@ -64,18 +91,18 @@ async def async_setup_entry(
|
|||||||
(SERVICE_DEBUG_INFO, {}, SupportsResponse.ONLY),
|
(SERVICE_DEBUG_INFO, {}, SupportsResponse.ONLY),
|
||||||
]
|
]
|
||||||
|
|
||||||
# Register services with their schemas
|
# Register the services that depends on the switch
|
||||||
for service_name, schema, supports_response in services:
|
for service in services:
|
||||||
platform.async_register_entity_service(
|
platform.async_register_entity_service(
|
||||||
service_name,
|
service[0],
|
||||||
make_entity_service_schema(schema),
|
make_entity_service_schema(service[1]),
|
||||||
service_name,
|
service[0],
|
||||||
supports_response=supports_response
|
supports_response=service[2]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class ComputerSwitch(SwitchEntity):
|
class ComputerSwitch(SwitchEntity):
|
||||||
"""Representation of a computer switch entity."""
|
"""Representation of a computer switch."""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@ -83,121 +110,216 @@ class ComputerSwitch(SwitchEntity):
|
|||||||
name: str,
|
name: str,
|
||||||
host: str | None,
|
host: str | None,
|
||||||
mac_address: str,
|
mac_address: str,
|
||||||
dualboot: bool,
|
# broadcast_address: str | None,
|
||||||
|
# broadcast_port: int | None,
|
||||||
|
dualboot: bool | False,
|
||||||
username: str,
|
username: str,
|
||||||
password: str,
|
password: str,
|
||||||
port: int | None,
|
port: int | None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Initialize the computer switch entity."""
|
"""Initialize the WOL switch."""
|
||||||
self.hass = hass
|
|
||||||
self._attr_name = name
|
|
||||||
self._attr_unique_id = dr.format_mac(mac_address)
|
|
||||||
self._state = False
|
|
||||||
self._attr_should_poll = not self._attr_assumed_state
|
|
||||||
self._attr_extra_state_attributes = {}
|
|
||||||
|
|
||||||
self.computer = Computer(host, mac_address, username, password, port, dualboot)
|
self._hass = hass
|
||||||
|
self._attr_name = name
|
||||||
|
self._host = host
|
||||||
|
self._mac_address = mac_address
|
||||||
|
# self._broadcast_address = broadcast_address
|
||||||
|
# self._broadcast_port = broadcast_port
|
||||||
|
self._dualboot = dualboot
|
||||||
|
self._username = username
|
||||||
|
self._password = password
|
||||||
|
self._port = port
|
||||||
|
self._state = False
|
||||||
|
self._attr_assumed_state = host is None
|
||||||
|
self._attr_should_poll = bool(not self._attr_assumed_state)
|
||||||
|
self._attr_unique_id = dr.format_mac(mac_address)
|
||||||
|
self._attr_extra_state_attributes = {}
|
||||||
|
self._connection = utils.create_ssh_connection(self._host, self._username, self._password)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def device_info(self) -> DeviceInfo:
|
def device_info(self) -> DeviceInfo | None:
|
||||||
"""Return device info for the registry."""
|
"""Return the device information."""
|
||||||
|
if self._host is None:
|
||||||
|
return None
|
||||||
|
|
||||||
return DeviceInfo(
|
return DeviceInfo(
|
||||||
identifiers={(DOMAIN, self.computer.mac)},
|
identifiers={(DOMAIN, self._mac_address)},
|
||||||
name=self._attr_name,
|
name=self._attr_name,
|
||||||
manufacturer="Generic",
|
manufacturer="Generic",
|
||||||
model="Computer",
|
model="Computer",
|
||||||
sw_version=self.computer.operating_system_version,
|
sw_version=utils.get_operating_system_version(
|
||||||
connections={(dr.CONNECTION_NETWORK_MAC, self.computer.mac)},
|
self._connection) if self._state else "Unknown",
|
||||||
|
connections={(dr.CONNECTION_NETWORK_MAC, self._mac_address)},
|
||||||
)
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def icon(self) -> str:
|
def icon(self) -> str:
|
||||||
|
"""Return the icon to use in the frontend, if any."""
|
||||||
return "mdi:monitor" if self._state else "mdi:monitor-off"
|
return "mdi:monitor" if self._state else "mdi:monitor-off"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_on(self) -> bool:
|
def is_on(self) -> bool:
|
||||||
"""Return true if the computer is on."""
|
"""Return true if the computer switch is on."""
|
||||||
return self._state
|
return self._state
|
||||||
|
|
||||||
async def async_turn_on(self, **kwargs: Any) -> None:
|
def turn_on(self, **kwargs: Any) -> None:
|
||||||
"""Turn the computer on using Wake-on-LAN."""
|
"""Turn the computer on using wake on lan."""
|
||||||
await self.computer.start()
|
service_kwargs: dict[str, Any] = {}
|
||||||
|
# if self._broadcast_address is not None:
|
||||||
|
# service_kwargs["ip_address"] = self._broadcast_address
|
||||||
|
# if self._broadcast_port is not None:
|
||||||
|
# service_kwargs["port"] = self._broadcast_port
|
||||||
|
|
||||||
|
_LOGGER.debug(
|
||||||
|
"Send magic packet to mac %s (broadcast: %s, port: %s)",
|
||||||
|
self._mac_address,
|
||||||
|
# self._broadcast_address,
|
||||||
|
# self._broadcast_port,
|
||||||
|
)
|
||||||
|
|
||||||
|
wakeonlan.send_magic_packet(self._mac_address, **service_kwargs)
|
||||||
|
|
||||||
if self._attr_assumed_state:
|
if self._attr_assumed_state:
|
||||||
self._state = True
|
self._state = True
|
||||||
self.async_write_ha_state()
|
self.async_write_ha_state()
|
||||||
|
|
||||||
async def async_turn_off(self, **kwargs: Any) -> None:
|
def turn_off(self, **kwargs: Any) -> None:
|
||||||
"""Turn the computer off via shutdown command."""
|
"""Turn the computer off using appropriate shutdown command based on running OS and/or distro."""
|
||||||
await self.computer.shutdown()
|
utils.shutdown_system(self._connection)
|
||||||
|
|
||||||
if self._attr_assumed_state:
|
if self._attr_assumed_state:
|
||||||
self._state = False
|
self._state = False
|
||||||
self.async_write_ha_state()
|
self.async_write_ha_state()
|
||||||
|
|
||||||
async def async_update(self) -> None:
|
def restart_to_windows_from_linux(self) -> None:
|
||||||
"""Update the state by checking if the computer is on."""
|
"""Restart the computer to Windows from a running Linux by setting grub-reboot and restarting."""
|
||||||
is_on = await self.computer.is_on()
|
|
||||||
if self.is_on != is_on:
|
|
||||||
self._state = is_on
|
|
||||||
# self.async_write_ha_state()
|
|
||||||
|
|
||||||
# If the computer is on, update its attributes
|
if self._dualboot:
|
||||||
if is_on:
|
utils.restart_to_windows_from_linux(self._connection)
|
||||||
await self.computer.update(is_on)
|
else:
|
||||||
|
_LOGGER.error(
|
||||||
|
"The computer with the IP address %s is not running a dualboot system or hasn't been configured "
|
||||||
|
"correctly in the UI.",
|
||||||
|
self._host)
|
||||||
|
|
||||||
|
def restart_to_linux_from_windows(self) -> None:
|
||||||
|
"""Restart the computer to Linux from a running Windows by setting grub-reboot and restarting."""
|
||||||
|
|
||||||
|
if self._dualboot:
|
||||||
|
# TODO: check for default grub entry and adapt accordingly
|
||||||
|
utils.restart_system(self._connection)
|
||||||
|
else:
|
||||||
|
_LOGGER.error(
|
||||||
|
"The computer with the IP address %s is not running a dualboot system or hasn't been configured "
|
||||||
|
"correctly in the UI.",
|
||||||
|
self._host)
|
||||||
|
|
||||||
|
def put_computer_to_sleep(self) -> None:
|
||||||
|
"""Put the computer to sleep using appropriate sleep command based on running OS and/or distro."""
|
||||||
|
utils.sleep_system(self._connection)
|
||||||
|
|
||||||
|
def start_computer_to_windows(self) -> None:
|
||||||
|
"""Start the computer to Linux, wait for it to boot, and then set grub-reboot and restart."""
|
||||||
|
self.turn_on()
|
||||||
|
|
||||||
|
if self._dualboot:
|
||||||
|
# Wait for the computer to boot using a dedicated thread to avoid blocking the main thread
|
||||||
|
self._hass.loop.create_task(self.service_restart_to_windows_from_linux())
|
||||||
|
|
||||||
|
else:
|
||||||
|
_LOGGER.error(
|
||||||
|
"The computer with the IP address %s is not running a dualboot system or hasn't been configured "
|
||||||
|
"correctly in the UI.",
|
||||||
|
self._host)
|
||||||
|
|
||||||
|
async def service_restart_to_windows_from_linux(self) -> None:
|
||||||
|
"""Method to be run in a separate thread to wait for the computer to boot and then reboot to Windows."""
|
||||||
|
while not self.is_on:
|
||||||
|
await asyncio.sleep(3)
|
||||||
|
|
||||||
|
await utils.restart_to_windows_from_linux(self._connection)
|
||||||
|
|
||||||
|
def restart_computer(self) -> None:
|
||||||
|
"""Restart the computer using appropriate restart command based on running OS and/or distro."""
|
||||||
|
|
||||||
|
# TODO: check for default grub entry and adapt accordingly
|
||||||
|
if self._dualboot and not utils.is_unix_system(connection=self._connection):
|
||||||
|
utils.restart_system(self._connection)
|
||||||
|
|
||||||
|
# Wait for the computer to boot using a dedicated thread to avoid blocking the main thread
|
||||||
|
self.restart_to_windows_from_linux()
|
||||||
|
else:
|
||||||
|
utils.restart_system(self._connection)
|
||||||
|
|
||||||
|
def change_monitors_config(self, monitors_config: dict | None = None) -> None:
|
||||||
|
"""Change the monitors configuration using a YAML config file."""
|
||||||
|
if monitors_config is not None and len(monitors_config) > 0:
|
||||||
|
utils.change_monitors_config(self._connection, monitors_config)
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("The 'monitors_config' parameter must be a non-empty dictionary.")
|
||||||
|
|
||||||
|
def steam_big_picture(self, action: str) -> None:
|
||||||
|
"""Controls Steam Big Picture mode."""
|
||||||
|
|
||||||
|
if action is not None:
|
||||||
|
utils.steam_big_picture(self._connection, action)
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("The 'action' parameter must be specified.")
|
||||||
|
|
||||||
|
def change_audio_config(self, volume: int | None = None, mute: bool | None = None, input_device: str | None = None,
|
||||||
|
output_device: str | None = None) -> None:
|
||||||
|
"""Change the audio configuration using a YAML config file."""
|
||||||
|
utils.change_audio_config(self._connection, volume, mute, input_device, output_device)
|
||||||
|
|
||||||
|
def update(self) -> None:
|
||||||
|
"""Ping the computer to see if it is online and update the state."""
|
||||||
|
timeout = 1
|
||||||
|
ping_cmd = ["ping", "-c", "1", "-W", str(timeout), str(self._host)]
|
||||||
|
status = sp.call(ping_cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
|
||||||
|
self._state = not bool(status)
|
||||||
|
|
||||||
|
# Update the state attributes and the connection only if the computer is on
|
||||||
|
if self._state:
|
||||||
|
if self._connection is None or not utils.test_connection(self._connection):
|
||||||
|
self.renew_ssh_connection()
|
||||||
|
|
||||||
|
if not self._state:
|
||||||
|
return
|
||||||
|
|
||||||
self._attr_extra_state_attributes = {
|
self._attr_extra_state_attributes = {
|
||||||
"operating_system": self.computer.operating_system,
|
"operating_system": utils.get_operating_system(self._connection),
|
||||||
"operating_system_version": self.computer.operating_system_version,
|
"operating_system_version": utils.get_operating_system_version(self._connection),
|
||||||
"mac_address": self.computer.mac,
|
"mac_address": self._mac_address,
|
||||||
"ip_address": self.computer.host,
|
"ip_address": self._host,
|
||||||
"connected_devices": get_bluetooth_devices_as_str(self.computer),
|
"connected_devices": utils.get_bluetooth_devices(self._connection, only_connected=True, return_as_string=True),
|
||||||
}
|
}
|
||||||
|
|
||||||
# Service methods for various functionalities
|
def renew_ssh_connection(self) -> None:
|
||||||
async def restart_to_windows_from_linux(self) -> None:
|
"""Renew the SSH connection."""
|
||||||
"""Restart the computer from Linux to Windows."""
|
_LOGGER.info("Renewing SSH connection to %s using username %s", self._host, self._username)
|
||||||
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
|
|
||||||
|
|
||||||
async def restart_to_linux_from_windows(self) -> None:
|
if self._connection is not None:
|
||||||
"""Restart the computer from Windows to Linux."""
|
self._connection.close()
|
||||||
await self.computer.restart(OSType.WINDOWS, OSType.LINUX)
|
|
||||||
|
|
||||||
async def put_computer_to_sleep(self) -> None:
|
try:
|
||||||
"""Put the computer to sleep."""
|
self._connection = utils.create_ssh_connection(self._host, self._username, self._password)
|
||||||
await self.computer.put_to_sleep()
|
self._connection.open()
|
||||||
|
except AuthenticationException as error:
|
||||||
|
_LOGGER.error("Could not authenticate to %s using username %s: %s", self._host, self._username, error)
|
||||||
|
self._state = False
|
||||||
|
except Exception as error:
|
||||||
|
_LOGGER.error("Could not connect to %s using username %s: %s", self._host, self._username, error)
|
||||||
|
|
||||||
async def start_computer_to_windows(self) -> None:
|
# Check if the error is due to timeout
|
||||||
"""Start the computer to Windows after booting into Linux first."""
|
if "timed out" in str(error):
|
||||||
await self.computer.start()
|
_LOGGER.warning(
|
||||||
|
"Computer at %s does not respond to the SSH request. Possible causes: might be offline, "
|
||||||
|
"the firewall is blocking the SSH port, or the SSH server is offline and/or misconfigured.",
|
||||||
|
self._host
|
||||||
|
)
|
||||||
|
|
||||||
async def wait_and_reboot() -> None:
|
self._state = False
|
||||||
"""Wait until the computer is on, then restart to Windows."""
|
|
||||||
while not await self.computer.is_on():
|
|
||||||
await asyncio.sleep(3)
|
|
||||||
await self.computer.restart(OSType.LINUX, OSType.WINDOWS)
|
|
||||||
|
|
||||||
self.hass.loop.create_task(wait_and_reboot())
|
def debug_info(self) -> ServiceResponse:
|
||||||
|
"""Prints debug info."""
|
||||||
async def restart_computer(self) -> None:
|
return utils.get_debug_info(self._connection)
|
||||||
"""Restart the computer."""
|
|
||||||
await self.computer.restart()
|
|
||||||
|
|
||||||
async def change_monitors_config(self, monitors_config: Dict[str, Any]) -> None:
|
|
||||||
"""Change the monitor configuration."""
|
|
||||||
await self.computer.set_monitors_config(monitors_config)
|
|
||||||
|
|
||||||
async def steam_big_picture(self, action: str) -> None:
|
|
||||||
"""Control Steam Big Picture mode."""
|
|
||||||
await self.computer.steam_big_picture(action)
|
|
||||||
|
|
||||||
async def change_audio_config(
|
|
||||||
self, volume: int | None = None, mute: bool | None = None,
|
|
||||||
input_device: str | None = None, output_device: str | None = None
|
|
||||||
) -> None:
|
|
||||||
"""Change the audio configuration."""
|
|
||||||
await self.computer.set_audio_config(volume, mute, input_device, output_device)
|
|
||||||
|
|
||||||
async def debug_info(self) -> ServiceResponse:
|
|
||||||
"""Return debug information."""
|
|
||||||
return await format_debug_information(self.computer)
|
|
||||||
|
550
custom_components/easy_computer_manager/utils.py
Normal file
550
custom_components/easy_computer_manager/utils.py
Normal file
@ -0,0 +1,550 @@
|
|||||||
|
import logging
|
||||||
|
import re
|
||||||
|
|
||||||
|
import fabric2
|
||||||
|
from fabric2 import Connection
|
||||||
|
from homeassistant.exceptions import HomeAssistantError
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# _LOGGER.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
|
def create_ssh_connection(host: str, username: str, password: str, port=22):
|
||||||
|
"""Create an SSH connection to a host using a username and password specified in the config flow."""
|
||||||
|
conf = fabric2.Config()
|
||||||
|
conf.run.hide = True
|
||||||
|
conf.run.warn = True
|
||||||
|
conf.warn = True
|
||||||
|
conf.sudo.password = password
|
||||||
|
conf.password = password
|
||||||
|
|
||||||
|
connection = Connection(
|
||||||
|
host=host, user=username, port=port, connect_timeout=3, connect_kwargs={"password": password},
|
||||||
|
config=conf
|
||||||
|
)
|
||||||
|
|
||||||
|
_LOGGER.info("Successfully created SSH connection to %s using username %s", host, username)
|
||||||
|
|
||||||
|
return connection
|
||||||
|
|
||||||
|
|
||||||
|
def test_connection(connection: Connection):
|
||||||
|
"""Test the connection to the host by running a simple command."""
|
||||||
|
try:
|
||||||
|
connection.run('ls')
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def is_unix_system(connection: Connection):
|
||||||
|
"""Return a boolean based on get_operating_system result."""
|
||||||
|
return get_operating_system(connection) == "Linux/Unix"
|
||||||
|
|
||||||
|
|
||||||
|
def get_operating_system_version(connection: Connection, is_unix=None):
|
||||||
|
"""Return the running operating system name and version."""
|
||||||
|
|
||||||
|
if is_unix is None:
|
||||||
|
is_unix = is_unix_system(connection)
|
||||||
|
|
||||||
|
if is_unix:
|
||||||
|
result = connection.run(
|
||||||
|
"awk -F'=' '/^NAME=|^VERSION=/{gsub(/\"/, \"\", $2); printf $2\" \"}\' /etc/os-release && echo").stdout
|
||||||
|
if result == "":
|
||||||
|
result = connection.run("lsb_release -a | awk '/Description/ {print $2, $3, $4}'").stdout
|
||||||
|
|
||||||
|
return result
|
||||||
|
else:
|
||||||
|
return connection.run(
|
||||||
|
'for /f "tokens=1 delims=|" %i in (\'wmic os get Name ^| findstr /B /C:"Microsoft"\') do @echo %i').stdout
|
||||||
|
|
||||||
|
|
||||||
|
def get_operating_system(connection: Connection):
|
||||||
|
"""Return the running operating system type."""
|
||||||
|
# TODO: might be a better way to do this
|
||||||
|
result = connection.run("uname")
|
||||||
|
if result.return_code == 0:
|
||||||
|
return "Linux/Unix"
|
||||||
|
else:
|
||||||
|
return "Windows/Other"
|
||||||
|
|
||||||
|
|
||||||
|
def shutdown_system(connection: Connection, is_unix=None):
|
||||||
|
"""Shutdown the system."""
|
||||||
|
|
||||||
|
if is_unix is None:
|
||||||
|
is_unix = is_unix_system(connection)
|
||||||
|
|
||||||
|
shutdown_commands = {
|
||||||
|
"unix": ["sudo shutdown -h now", "sudo init 0", "sudo systemctl poweroff"],
|
||||||
|
"windows": ["shutdown /s /t 0", "wmic os where Primary=TRUE call Shutdown"]
|
||||||
|
}
|
||||||
|
|
||||||
|
for command in shutdown_commands["unix" if is_unix else "windows"]:
|
||||||
|
result = connection.run(command)
|
||||||
|
if result.return_code == 0:
|
||||||
|
_LOGGER.debug("System shutting down on %s.", connection.host)
|
||||||
|
connection.close()
|
||||||
|
return
|
||||||
|
|
||||||
|
raise HomeAssistantError(f"Cannot shutdown system running at {connection.host}, all methods failed.")
|
||||||
|
|
||||||
|
|
||||||
|
def restart_system(connection: Connection, is_unix=None):
|
||||||
|
"""Restart the system."""
|
||||||
|
|
||||||
|
if is_unix is None:
|
||||||
|
is_unix = is_unix_system(connection)
|
||||||
|
|
||||||
|
restart_commands = {
|
||||||
|
"unix": ["sudo shutdown -r now", "sudo init 6", "sudo systemctl reboot"],
|
||||||
|
"windows": ["shutdown /r /t 0", "wmic os where Primary=TRUE call Reboot"]
|
||||||
|
}
|
||||||
|
|
||||||
|
for command in restart_commands["unix" if is_unix else "windows"]:
|
||||||
|
result = connection.run(command)
|
||||||
|
if result.return_code == 0:
|
||||||
|
_LOGGER.debug("System restarting on %s.", connection.host)
|
||||||
|
return
|
||||||
|
|
||||||
|
raise HomeAssistantError(f"Cannot restart system running at {connection.host}, all methods failed.")
|
||||||
|
|
||||||
|
|
||||||
|
def sleep_system(connection: Connection, is_unix=None):
|
||||||
|
"""Put the system to sleep."""
|
||||||
|
|
||||||
|
if is_unix is None:
|
||||||
|
is_unix = is_unix_system(connection)
|
||||||
|
|
||||||
|
sleep_commands = {
|
||||||
|
"unix": ["sudo systemctl suspend", "sudo pm-suspend"],
|
||||||
|
"windows": ["shutdown /h /t 0", "rundll32.exe powrprof.dll,SetSuspendState Sleep"]
|
||||||
|
}
|
||||||
|
|
||||||
|
for command in sleep_commands["unix" if is_unix else "windows"]:
|
||||||
|
result = connection.run(command)
|
||||||
|
if result.return_code == 0:
|
||||||
|
_LOGGER.debug("System sleeping on %s.", connection.host)
|
||||||
|
return
|
||||||
|
|
||||||
|
raise HomeAssistantError(f"Cannot put system running at {connection.host} to sleep, all methods failed.")
|
||||||
|
|
||||||
|
|
||||||
|
def get_windows_entry_in_grub(connection: Connection):
|
||||||
|
"""
|
||||||
|
Grabs the Windows entry name in GRUB.
|
||||||
|
Used later with grub-reboot to specify which entry to boot.
|
||||||
|
"""
|
||||||
|
commands = [
|
||||||
|
"sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub/grub.cfg",
|
||||||
|
"sudo awk -F \"'\" '/windows/ {print $2}' /boot/grub2/grub.cfg"
|
||||||
|
]
|
||||||
|
|
||||||
|
for command in commands:
|
||||||
|
result = connection.run(command)
|
||||||
|
if result.return_code == 0 and result.stdout.strip():
|
||||||
|
_LOGGER.debug("Found Windows entry in GRUB: " + result.stdout.strip())
|
||||||
|
return result.stdout.strip()
|
||||||
|
|
||||||
|
_LOGGER.error("Could not find Windows entry in GRUB for system running at %s.", connection.host)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def restart_to_windows_from_linux(connection: Connection):
|
||||||
|
"""Restart a running Linux system to Windows."""
|
||||||
|
|
||||||
|
if not is_unix_system(connection):
|
||||||
|
raise HomeAssistantError(f"System running at {connection.host} is not a Linux system.")
|
||||||
|
|
||||||
|
windows_entry = get_windows_entry_in_grub(connection)
|
||||||
|
|
||||||
|
if windows_entry is not None:
|
||||||
|
reboot_commands = ["sudo grub-reboot", "sudo grub2-reboot"]
|
||||||
|
|
||||||
|
for reboot_command in reboot_commands:
|
||||||
|
result = connection.run(f"{reboot_command} \"{windows_entry}\"")
|
||||||
|
|
||||||
|
if result.return_code == 0:
|
||||||
|
_LOGGER.debug("Rebooting to Windows")
|
||||||
|
restart_system(connection)
|
||||||
|
return
|
||||||
|
|
||||||
|
raise HomeAssistantError(f"Failed to restart system running on {connection.host} to Windows from Linux.")
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError(f"Could not find Windows entry in grub for system running at {connection.host}.")
|
||||||
|
|
||||||
|
|
||||||
|
def change_monitors_config(connection: Connection, monitors_config: dict):
|
||||||
|
"""Change monitors configuration on the host (Linux + Gnome, and partial Windows support)."""
|
||||||
|
|
||||||
|
if is_unix_system(connection):
|
||||||
|
command_parts = ["gnome-monitor-config", "set"]
|
||||||
|
|
||||||
|
for monitor, settings in monitors_config.items():
|
||||||
|
if settings.get('enabled', False):
|
||||||
|
command_parts.extend(['-LpM' if settings.get('primary', False) else '-LM', monitor])
|
||||||
|
|
||||||
|
if 'position' in settings:
|
||||||
|
command_parts.extend(['-x', str(settings["position"][0]), '-y', str(settings["position"][1])])
|
||||||
|
|
||||||
|
if 'mode' in settings:
|
||||||
|
command_parts.extend(['-m', settings["mode"]])
|
||||||
|
|
||||||
|
if 'scale' in settings:
|
||||||
|
command_parts.extend(['-s', str(settings["scale"])])
|
||||||
|
|
||||||
|
if 'transform' in settings:
|
||||||
|
command_parts.extend(['-t', settings["transform"]])
|
||||||
|
|
||||||
|
command = ' '.join(command_parts)
|
||||||
|
_LOGGER.debug("Running command: %s", command)
|
||||||
|
|
||||||
|
result = connection.run(command)
|
||||||
|
|
||||||
|
if result.return_code == 0:
|
||||||
|
_LOGGER.info("Successfully changed monitors config on system running on %s.", connection.host)
|
||||||
|
|
||||||
|
# Run it once again, it fixes some strange Gnome display bug sometimes and it doesn't hurt
|
||||||
|
connection.run(command)
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("Could not change monitors config on system running on %s, check logs with debug",
|
||||||
|
connection.host)
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||||
|
|
||||||
|
# TODO: Implement Windows support using NIRCMD
|
||||||
|
command_parts = ["nircmd.exe", "setdisplay"]
|
||||||
|
# setdisplay {monitor:index/name} [width] [height] [color bits] {refresh rate} {-updatereg} {-allusers}
|
||||||
|
|
||||||
|
for monitor, settings in monitors_config.items():
|
||||||
|
if settings.get('enabled', False):
|
||||||
|
command_parts.extend(
|
||||||
|
[f'{monitor} -primary' if settings.get('primary', False) else f'{monitor} -secondary'])
|
||||||
|
|
||||||
|
if 'resolution' in settings:
|
||||||
|
command_parts.extend([str(settings["resolution"][0]), str(settings["resolution"][1])])
|
||||||
|
|
||||||
|
if 'refresh_rate' in settings:
|
||||||
|
command_parts.extend(['-hz', str(settings["refresh_rate"])])
|
||||||
|
|
||||||
|
if 'color_bits' in settings:
|
||||||
|
command_parts.extend(['-bits', str(settings["color_bits"])])
|
||||||
|
|
||||||
|
command = ' '.join(command_parts)
|
||||||
|
_LOGGER.debug("Running command: %s", command)
|
||||||
|
|
||||||
|
result = connection.run(command)
|
||||||
|
|
||||||
|
if result.return_code == 0:
|
||||||
|
_LOGGER.info("Successfully changed monitors config on system running on %s.", connection.host)
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("Could not change monitors config on system running on %s, check logs with debug",
|
||||||
|
connection.host)
|
||||||
|
|
||||||
|
|
||||||
|
def silent_install_nircmd(connection: Connection):
|
||||||
|
"""Silently install NIRCMD on a Windows system."""
|
||||||
|
|
||||||
|
if not is_unix_system(connection):
|
||||||
|
download_url = "https://www.nirsoft.net/utils/nircmd.zip"
|
||||||
|
install_path = f"C:\\Users\\{connection.user}\\AppData\\Local\\EasyComputerManager"
|
||||||
|
|
||||||
|
# Download and unzip NIRCMD
|
||||||
|
download_command = f"powershell -Command \"Invoke-WebRequest -Uri {download_url} -OutFile {install_path}\\nircmd.zip -UseBasicParsing\""
|
||||||
|
unzip_command = f"powershell -Command \"Expand-Archive {install_path}\\nircmd.zip -DestinationPath {install_path}\""
|
||||||
|
remove_zip_command = f"powershell -Command \"Remove-Item {install_path}\\nircmd.zip\""
|
||||||
|
|
||||||
|
commands = [download_command, unzip_command, remove_zip_command]
|
||||||
|
|
||||||
|
for command in commands:
|
||||||
|
result = connection.run(command)
|
||||||
|
if result.return_code != 0:
|
||||||
|
_LOGGER.error("Could not install NIRCMD on system running on %s.", connection.host)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def get_monitors_config(connection: Connection) -> dict:
|
||||||
|
"""Parse the output of the gnome-monitor-config command to get the current monitor configuration."""
|
||||||
|
|
||||||
|
if is_unix_system(connection):
|
||||||
|
result = connection.run("gnome-monitor-config list")
|
||||||
|
if result.return_code != 0:
|
||||||
|
raise HomeAssistantError(f"Could not get monitors config on system running at {connection.host}. "
|
||||||
|
f"Make sure gnome-monitor-config package is installed.")
|
||||||
|
|
||||||
|
monitors = []
|
||||||
|
current_monitor = None
|
||||||
|
|
||||||
|
for line in result.stdout.split('\n'):
|
||||||
|
monitor_match = re.match(r'^Monitor \[ (.+?) \] (ON|OFF)$', line)
|
||||||
|
if monitor_match:
|
||||||
|
if current_monitor:
|
||||||
|
monitors.append(current_monitor)
|
||||||
|
source, status = monitor_match.groups()
|
||||||
|
current_monitor = {'source': source, 'status': status, 'names': [], 'resolutions': []}
|
||||||
|
elif current_monitor:
|
||||||
|
display_name_match = re.match(r'^\s+display-name: (.+)$', line)
|
||||||
|
resolution_match = re.match(r'^\s+(\d+x\d+@\d+(?:\.\d+)?).*$', line)
|
||||||
|
if display_name_match:
|
||||||
|
current_monitor['names'].append(display_name_match.group(1).replace('"', ''))
|
||||||
|
elif resolution_match:
|
||||||
|
# Don't include resolutions under 1280x720
|
||||||
|
if int(resolution_match.group(1).split('@')[0].split('x')[0]) >= 1280:
|
||||||
|
|
||||||
|
# If there are already resolutions in the list, check if the framerate between the last is >1
|
||||||
|
if len(current_monitor['resolutions']) > 0:
|
||||||
|
last_resolution = current_monitor['resolutions'][-1]
|
||||||
|
last_resolution_size = last_resolution.split('@')[0]
|
||||||
|
this_resolution_size = resolution_match.group(1).split('@')[0]
|
||||||
|
|
||||||
|
# Only truncate some framerates if the resolution are the same
|
||||||
|
if last_resolution_size == this_resolution_size:
|
||||||
|
last_resolution_framerate = float(last_resolution.split('@')[1])
|
||||||
|
this_resolution_framerate = float(resolution_match.group(1).split('@')[1])
|
||||||
|
|
||||||
|
# If the difference between the last resolution framerate and this one is >1, ignore it
|
||||||
|
if last_resolution_framerate - 1 > this_resolution_framerate:
|
||||||
|
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||||
|
else:
|
||||||
|
# If the resolution is different, this adds the new resolution
|
||||||
|
# to the list without truncating
|
||||||
|
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||||
|
else:
|
||||||
|
# This is the first resolution, add it to the list
|
||||||
|
current_monitor['resolutions'].append(resolution_match.group(1))
|
||||||
|
|
||||||
|
if current_monitor:
|
||||||
|
monitors.append(current_monitor)
|
||||||
|
|
||||||
|
return monitors
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||||
|
|
||||||
|
|
||||||
|
def steam_big_picture(connection: Connection, action: str):
|
||||||
|
"""Controls Steam in Big Picture mode on the host."""
|
||||||
|
|
||||||
|
_LOGGER.debug(f"Running Steam Big Picture action {action} on system running at {connection.host}.")
|
||||||
|
|
||||||
|
steam_commands = {
|
||||||
|
"start": {
|
||||||
|
"unix": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -bigpicture &",
|
||||||
|
"windows": "start steam://open/bigpicture"
|
||||||
|
},
|
||||||
|
"stop": {
|
||||||
|
"unix": "export WAYLAND_DISPLAY=wayland-0; export DISPLAY=:0; steam -shutdown &",
|
||||||
|
"windows": "C:\\Program Files (x86)\\Steam\\steam.exe -shutdown"
|
||||||
|
# TODO: check for different Steam install paths
|
||||||
|
},
|
||||||
|
"exit": {
|
||||||
|
"unix": None, # TODO: find a way to exit Steam Big Picture
|
||||||
|
"windows": "nircmd win close title \"Steam Big Picture Mode\""
|
||||||
|
# TODO: need to test (thx @MasterHidra https://www.reddit.com/r/Steam/comments/5c9l20/comment/k5fmb3k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
command = steam_commands.get(action)
|
||||||
|
|
||||||
|
if command is None:
|
||||||
|
raise HomeAssistantError(
|
||||||
|
f"Invalid action {action} for Steam Big Picture on system running at {connection.host}.")
|
||||||
|
|
||||||
|
if is_unix_system(connection):
|
||||||
|
result = connection.run(command.get("unix"))
|
||||||
|
else:
|
||||||
|
result = connection.run(command.get("windows"))
|
||||||
|
|
||||||
|
if result.return_code != 0:
|
||||||
|
raise HomeAssistantError(f"Could not {action} Steam Big Picture on system running at {connection.host}.")
|
||||||
|
|
||||||
|
|
||||||
|
def get_audio_config(connection: Connection):
|
||||||
|
if is_unix_system(connection):
|
||||||
|
config = {'sinks': [], 'sources': []}
|
||||||
|
|
||||||
|
def parse_device_info(lines, device_type):
|
||||||
|
devices = []
|
||||||
|
current_device = {}
|
||||||
|
|
||||||
|
for line in lines:
|
||||||
|
if line.startswith(f"{device_type} #"):
|
||||||
|
if current_device and "Monitor" not in current_device['description']:
|
||||||
|
devices.append(current_device)
|
||||||
|
current_device = {'id': int(re.search(r'#(\d+)', line).group(1))}
|
||||||
|
elif line.startswith(" Name:"):
|
||||||
|
current_device['name'] = line.split(":")[1].strip()
|
||||||
|
elif line.startswith(" State:"):
|
||||||
|
current_device['state'] = line.split(":")[1].strip()
|
||||||
|
elif line.startswith(" Description:"):
|
||||||
|
current_device['description'] = line.split(":")[1].strip()
|
||||||
|
|
||||||
|
if current_device:
|
||||||
|
devices.append(current_device)
|
||||||
|
|
||||||
|
return devices
|
||||||
|
|
||||||
|
# Get sinks
|
||||||
|
result = connection.run("LANG=en_US.UTF-8 pactl list sinks")
|
||||||
|
if result.return_code != 0:
|
||||||
|
raise HomeAssistantError(f"Could not get audio sinks on system running at {connection.host}. "
|
||||||
|
f"Make sure pactl package is installed!")
|
||||||
|
config['sinks'] = parse_device_info(result.stdout.split('\n'), 'Sink')
|
||||||
|
|
||||||
|
# Get sources
|
||||||
|
result = connection.run("LANG=en_US.UTF-8 pactl list sources")
|
||||||
|
if result.return_code != 0:
|
||||||
|
raise HomeAssistantError(f"Could not get audio sources on system running at {connection.host}."
|
||||||
|
f"Make sure pactl package is installed!")
|
||||||
|
config['sources'] = parse_device_info(result.stdout.split('\n'), 'Source')
|
||||||
|
|
||||||
|
return config
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||||
|
|
||||||
|
|
||||||
|
def change_audio_config(connection: Connection, volume: int, mute: bool, input_device: str = "@DEFAULT_SOURCE@",
|
||||||
|
output_device: str = "@DEFAULT_SINK@"):
|
||||||
|
"""Change audio configuration on the host system."""
|
||||||
|
|
||||||
|
if is_unix_system(connection):
|
||||||
|
current_config = get_audio_config(connection)
|
||||||
|
executable = "pactl"
|
||||||
|
commands = []
|
||||||
|
|
||||||
|
def get_device_id(device_type, user_device):
|
||||||
|
for device in current_config[device_type]:
|
||||||
|
if device['description'] == user_device:
|
||||||
|
return device['name']
|
||||||
|
return user_device
|
||||||
|
|
||||||
|
# Set default sink and source if not specified
|
||||||
|
if not output_device:
|
||||||
|
output_device = "@DEFAULT_SINK@"
|
||||||
|
if not input_device:
|
||||||
|
input_device = "@DEFAULT_SOURCE@"
|
||||||
|
|
||||||
|
# Set default sink if specified
|
||||||
|
if output_device and output_device != "@DEFAULT_SINK@":
|
||||||
|
output_device = get_device_id('sinks', output_device)
|
||||||
|
commands.append(f"{executable} set-default-sink {output_device}")
|
||||||
|
|
||||||
|
# Set default source if specified
|
||||||
|
if input_device and input_device != "@DEFAULT_SOURCE@":
|
||||||
|
input_device = get_device_id('sources', input_device)
|
||||||
|
commands.append(f"{executable} set-default-source {input_device}")
|
||||||
|
|
||||||
|
# Set sink volume if specified
|
||||||
|
if volume is not None:
|
||||||
|
commands.append(f"{executable} set-sink-volume {output_device} {volume}%")
|
||||||
|
|
||||||
|
# Set sink and source mute status if specified
|
||||||
|
if mute is not None:
|
||||||
|
commands.append(f"{executable} set-sink-mute {output_device} {'yes' if mute else 'no'}")
|
||||||
|
commands.append(f"{executable} set-source-mute {input_device} {'yes' if mute else 'no'}")
|
||||||
|
|
||||||
|
# Execute commands
|
||||||
|
for command in commands:
|
||||||
|
_LOGGER.debug("Running command: %s", command)
|
||||||
|
result = connection.run(command)
|
||||||
|
|
||||||
|
if result.return_code != 0:
|
||||||
|
raise HomeAssistantError(
|
||||||
|
f"Could not change audio config on system running on {connection.host}, check logs with debug and"
|
||||||
|
f"make sure pactl package is installed!")
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
||||||
|
|
||||||
|
|
||||||
|
def get_debug_info(connection: Connection):
|
||||||
|
"""Return debug information about the host system."""
|
||||||
|
|
||||||
|
data = {}
|
||||||
|
|
||||||
|
data_os = {
|
||||||
|
'name': get_operating_system(connection),
|
||||||
|
'version': get_operating_system_version(connection),
|
||||||
|
'is_unix': is_unix_system(connection)
|
||||||
|
}
|
||||||
|
|
||||||
|
data_ssh = {
|
||||||
|
'is_connected': connection.is_connected,
|
||||||
|
'username': connection.user,
|
||||||
|
'host': connection.host,
|
||||||
|
'port': connection.port
|
||||||
|
}
|
||||||
|
|
||||||
|
data_grub = {
|
||||||
|
'windows_entry': get_windows_entry_in_grub(connection)
|
||||||
|
}
|
||||||
|
|
||||||
|
data_audio = {
|
||||||
|
'speakers': get_audio_config(connection).get('sinks'),
|
||||||
|
'microphones': get_audio_config(connection).get('sources')
|
||||||
|
}
|
||||||
|
|
||||||
|
data['os'] = data_os
|
||||||
|
data['ssh'] = data_ssh
|
||||||
|
data['grub'] = data_grub
|
||||||
|
data['audio'] = data_audio
|
||||||
|
data['monitors'] = get_monitors_config(connection)
|
||||||
|
data['bluetooth_devices'] = get_bluetooth_devices(connection)
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def get_bluetooth_devices(connection: Connection, only_connected: bool = False, return_as_string: bool = False) -> []:
|
||||||
|
commands = {
|
||||||
|
"unix": "bluetoothctl info",
|
||||||
|
"windows": "",
|
||||||
|
}
|
||||||
|
|
||||||
|
if is_unix_system(connection):
|
||||||
|
result = connection.run(commands["unix"])
|
||||||
|
if result.return_code != 0:
|
||||||
|
if result.stdout.__contains__("Missing device address argument"): # Means no devices are connected
|
||||||
|
return []
|
||||||
|
else:
|
||||||
|
_LOGGER.warning(f"Cannot retrieve bluetooth devices at {connection.host}. "
|
||||||
|
f"Make sure bluetoothctl is installed!")
|
||||||
|
return []
|
||||||
|
|
||||||
|
devices = []
|
||||||
|
current_device = None
|
||||||
|
|
||||||
|
for line in result.stdout.split('\n'):
|
||||||
|
if line.startswith('Device'):
|
||||||
|
if current_device is not None:
|
||||||
|
devices.append({
|
||||||
|
"address": current_device,
|
||||||
|
"name": current_name,
|
||||||
|
"connected": current_connected
|
||||||
|
})
|
||||||
|
current_device = line.split()[1]
|
||||||
|
current_name = None
|
||||||
|
current_connected = None
|
||||||
|
elif 'Name:' in line:
|
||||||
|
current_name = line.split(': ', 1)[1]
|
||||||
|
elif 'Connected:' in line:
|
||||||
|
current_connected = line.split(': ')[1] == 'yes'
|
||||||
|
|
||||||
|
# Add the last device if any
|
||||||
|
if current_device is not None:
|
||||||
|
devices.append({
|
||||||
|
"address": current_device,
|
||||||
|
"name": current_name,
|
||||||
|
"connected": current_connected
|
||||||
|
})
|
||||||
|
|
||||||
|
if only_connected:
|
||||||
|
devices = [device for device in devices if device["connected"] == "yes"]
|
||||||
|
|
||||||
|
if return_as_string:
|
||||||
|
devices = "; ".join([f"{device['name']} ({device['address']})" for device in devices])
|
||||||
|
|
||||||
|
return devices
|
||||||
|
else:
|
||||||
|
raise HomeAssistantError("Not implemented yet for Windows OS.")
|
@ -1,4 +1,5 @@
|
|||||||
|
fabric2~=3.2.2
|
||||||
|
paramiko~=3.4.0
|
||||||
|
voluptuous~=0.14.2
|
||||||
wakeonlan~=3.1.0
|
wakeonlan~=3.1.0
|
||||||
homeassistant
|
homeassistant~=2024.3.1
|
||||||
paramiko~=3.5.0
|
|
||||||
voluptuous~=0.15.2
|
|
Loading…
Reference in New Issue
Block a user