mirror of
https://github.com/NanmiCoder/cc-haha.git
synced 2026-05-06 23:31:17 +08:00
770 lines
27 KiB
Python
770 lines
27 KiB
Python
#!/usr/bin/env python3
|
|
"""Windows Computer Use helper — same JSON protocol as mac_helper.py.
|
|
|
|
Uses win32gui / win32api / win32process / psutil / pyperclip / screeninfo
|
|
to replicate macOS-specific Quartz/AppKit functionality on Windows.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import mss
|
|
from PIL import Image
|
|
|
|
os.environ.setdefault("PYTHONDONTWRITEBYTECODE", "1")
|
|
os.environ.setdefault("PYAUTOGUI_HIDE_SUPPORT_PROMPT", "1")
|
|
|
|
import pyautogui # noqa: E402
|
|
|
|
# The desktop app decodes helper stdout as UTF-8. On Windows, redirected Python
|
|
# stdout defaults to the active ANSI code page (for example GBK), which mangles
|
|
# localized app names from the registry. Force UTF-8 at process start so JSON
|
|
# responses stay stable regardless of the user's system locale.
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8", errors="strict")
|
|
if hasattr(sys.stderr, "reconfigure"):
|
|
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
|
|
|
|
pyautogui.FAILSAFE = False
|
|
pyautogui.PAUSE = 0
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Key mapping — Windows uses 'win' instead of 'command'
|
|
# ---------------------------------------------------------------------------
|
|
KEY_MAP = {
|
|
"a": "a", "b": "b", "c": "c", "d": "d", "e": "e",
|
|
"f": "f", "g": "g", "h": "h", "i": "i", "j": "j",
|
|
"k": "k", "l": "l", "m": "m", "n": "n", "o": "o",
|
|
"p": "p", "q": "q", "r": "r", "s": "s", "t": "t",
|
|
"u": "u", "v": "v", "w": "w", "x": "x", "y": "y",
|
|
"z": "z",
|
|
"0": "0", "1": "1", "2": "2", "3": "3", "4": "4",
|
|
"5": "5", "6": "6", "7": "7", "8": "8", "9": "9",
|
|
# Modifier keys — map macOS names to Windows equivalents
|
|
"cmd": "win",
|
|
"command": "win",
|
|
"meta": "win",
|
|
"super": "win",
|
|
"ctrl": "ctrl",
|
|
"control": "ctrl",
|
|
"shift": "shift",
|
|
"alt": "alt",
|
|
"option": "alt",
|
|
"opt": "alt",
|
|
"fn": "fn",
|
|
# Navigation / editing
|
|
"escape": "esc",
|
|
"esc": "esc",
|
|
"enter": "enter",
|
|
"return": "enter",
|
|
"tab": "tab",
|
|
"space": "space",
|
|
"backspace": "backspace",
|
|
"delete": "delete",
|
|
"forwarddelete": "delete",
|
|
"up": "up",
|
|
"down": "down",
|
|
"left": "left",
|
|
"right": "right",
|
|
"home": "home",
|
|
"end": "end",
|
|
"pageup": "pageup",
|
|
"pagedown": "pagedown",
|
|
"capslock": "capslock",
|
|
# Function keys
|
|
"f1": "f1", "f2": "f2", "f3": "f3", "f4": "f4",
|
|
"f5": "f5", "f6": "f6", "f7": "f7", "f8": "f8",
|
|
"f9": "f9", "f10": "f10", "f11": "f11", "f12": "f12",
|
|
# Symbols
|
|
"-": "-", "=": "=", "[": "[", "]": "]", "\\": "\\",
|
|
";": ";", "'": "'", ",": ",", ".": ".", "/": "/", "`": "`",
|
|
}
|
|
|
|
|
|
def normalize_key(name: str) -> str:
|
|
key = name.strip().lower()
|
|
if key not in KEY_MAP:
|
|
raise ValueError(f"Unsupported key: {name}")
|
|
return KEY_MAP[key]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# JSON output helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def json_output(payload: dict[str, Any]) -> None:
|
|
sys.stdout.write(json.dumps(payload, ensure_ascii=False))
|
|
sys.stdout.write("\n")
|
|
sys.stdout.flush()
|
|
|
|
|
|
def error_output(message: str, code: str = "runtime_error") -> None:
|
|
json_output({"ok": False, "error": {"code": code, "message": message}})
|
|
|
|
|
|
def bool_env(name: str, default: bool = False) -> bool:
|
|
value = os.environ.get(name)
|
|
if value is None:
|
|
return default
|
|
return value not in {"0", "false", "False", ""}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Display / Monitor helpers (via screeninfo + ctypes)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_displays() -> list[dict[str, Any]]:
|
|
"""Enumerate monitors via screeninfo, with DPI scale from ctypes."""
|
|
from screeninfo import get_monitors
|
|
|
|
displays: list[dict[str, Any]] = []
|
|
for idx, m in enumerate(get_monitors()):
|
|
scale_factor = _get_monitor_scale(m)
|
|
name = m.name or f"Display {idx + 1}"
|
|
displays.append({
|
|
"id": idx,
|
|
"displayId": idx,
|
|
"width": m.width,
|
|
"height": m.height,
|
|
"scaleFactor": scale_factor,
|
|
"originX": m.x,
|
|
"originY": m.y,
|
|
"isPrimary": m.is_primary if hasattr(m, "is_primary") else (idx == 0),
|
|
"name": name,
|
|
"label": name,
|
|
})
|
|
return displays
|
|
|
|
|
|
def _get_monitor_scale(monitor: Any) -> float:
|
|
"""Get the DPI scale factor for a monitor. Returns 1.0 on failure."""
|
|
try:
|
|
import ctypes
|
|
# SetProcessDPIAware so we get real pixel values
|
|
ctypes.windll.user32.SetProcessDPIAware()
|
|
# Get DPI for the primary — simplified; per-monitor DPI is complex
|
|
hdc = ctypes.windll.user32.GetDC(0)
|
|
dpi = ctypes.windll.gdi32.GetDeviceCaps(hdc, 88) # LOGPIXELSX
|
|
ctypes.windll.user32.ReleaseDC(0, hdc)
|
|
return dpi / 96.0
|
|
except Exception:
|
|
return 1.0
|
|
|
|
|
|
def choose_display(display_id: int | None) -> dict[str, Any]:
|
|
displays = get_displays()
|
|
if not displays:
|
|
raise RuntimeError("No active displays found")
|
|
if display_id is None:
|
|
for display in displays:
|
|
if display["isPrimary"]:
|
|
return display
|
|
return displays[0]
|
|
for display in displays:
|
|
if display["displayId"] == display_id or display["id"] == display_id:
|
|
return display
|
|
raise RuntimeError(f"Unknown display: {display_id}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Screen capture (mss — cross-platform, identical to mac_helper)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def capture_display(display_id: int | None, resize: tuple[int, int] | None = None) -> dict[str, Any]:
|
|
display = choose_display(display_id)
|
|
monitor = {
|
|
"left": display["originX"],
|
|
"top": display["originY"],
|
|
"width": display["width"],
|
|
"height": display["height"],
|
|
}
|
|
with mss.mss() as sct:
|
|
raw = sct.grab(monitor)
|
|
image = Image.frombytes("RGB", raw.size, raw.rgb)
|
|
if resize:
|
|
image = image.resize(resize, Image.Resampling.LANCZOS)
|
|
buffer = BytesIO()
|
|
image.save(buffer, format="JPEG", quality=75, optimize=True)
|
|
base64_data = base64.b64encode(buffer.getvalue()).decode("ascii")
|
|
return {
|
|
"base64": base64_data,
|
|
"width": image.width,
|
|
"height": image.height,
|
|
"displayWidth": display["width"],
|
|
"displayHeight": display["height"],
|
|
"displayId": display["displayId"],
|
|
"originX": display["originX"],
|
|
"originY": display["originY"],
|
|
"display": display,
|
|
}
|
|
|
|
|
|
def capture_region(region: dict[str, int], resize: tuple[int, int] | None = None) -> dict[str, Any]:
|
|
with mss.mss() as sct:
|
|
raw = sct.grab(region)
|
|
image = Image.frombytes("RGB", raw.size, raw.rgb)
|
|
if resize:
|
|
image = image.resize(resize, Image.Resampling.LANCZOS)
|
|
buffer = BytesIO()
|
|
image.save(buffer, format="JPEG", quality=75, optimize=True)
|
|
base64_data = base64.b64encode(buffer.getvalue()).decode("ascii")
|
|
return {"base64": base64_data, "width": image.width, "height": image.height}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Window management (win32gui)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def list_windows() -> list[dict[str, Any]]:
|
|
"""List visible on-screen windows with their bounds."""
|
|
import win32gui
|
|
|
|
results: list[dict[str, Any]] = []
|
|
|
|
def _enum_cb(hwnd: int, _: Any) -> None:
|
|
if not win32gui.IsWindowVisible(hwnd):
|
|
return
|
|
title = win32gui.GetWindowText(hwnd)
|
|
try:
|
|
left, top, right, bottom = win32gui.GetWindowRect(hwnd)
|
|
except Exception:
|
|
return
|
|
width = right - left
|
|
height = bottom - top
|
|
if width <= 1 or height <= 1:
|
|
return
|
|
# Get the process name as owner
|
|
owner = _get_window_process_name(hwnd)
|
|
results.append({
|
|
"ownerName": owner,
|
|
"title": title,
|
|
"bounds": {"x": left, "y": top, "width": width, "height": height},
|
|
})
|
|
|
|
win32gui.EnumWindows(_enum_cb, None)
|
|
return results
|
|
|
|
|
|
def _get_window_process_name(hwnd: int) -> str:
|
|
"""Get the exe name of the process owning a window handle."""
|
|
try:
|
|
import win32process
|
|
import psutil
|
|
_, pid = win32process.GetWindowThreadProcessId(hwnd)
|
|
proc = psutil.Process(pid)
|
|
return proc.name()
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Application management
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _get_exe_path_for_pid(pid: int) -> str | None:
|
|
try:
|
|
import psutil
|
|
return psutil.Process(pid).exe()
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def installed_apps() -> list[dict[str, Any]]:
|
|
"""List installed programs from Windows registry and Start Menu shortcuts."""
|
|
import winreg
|
|
|
|
results: dict[str, dict[str, Any]] = {}
|
|
reg_paths = [
|
|
(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall"),
|
|
(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\WOW6432Node\Microsoft\Windows\CurrentVersion\Uninstall"),
|
|
(winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall"),
|
|
]
|
|
|
|
for hive, sub_key in reg_paths:
|
|
try:
|
|
key = winreg.OpenKey(hive, sub_key)
|
|
except OSError:
|
|
continue
|
|
try:
|
|
i = 0
|
|
while True:
|
|
try:
|
|
name = winreg.EnumKey(key, i)
|
|
i += 1
|
|
except OSError:
|
|
break
|
|
try:
|
|
app_key = winreg.OpenKey(key, name)
|
|
except OSError:
|
|
continue
|
|
try:
|
|
display_name = winreg.QueryValueEx(app_key, "DisplayName")[0]
|
|
except OSError:
|
|
winreg.CloseKey(app_key)
|
|
continue
|
|
# Use the registry key name as a stable identifier (like bundleId)
|
|
try:
|
|
install_location = winreg.QueryValueEx(app_key, "InstallLocation")[0]
|
|
except OSError:
|
|
install_location = ""
|
|
try:
|
|
display_icon = winreg.QueryValueEx(app_key, "DisplayIcon")[0]
|
|
except OSError:
|
|
display_icon = ""
|
|
normalized_icon = str(display_icon).split(",")[0].strip().strip('"')
|
|
normalized_install_location = str(install_location).strip().strip('"')
|
|
|
|
bundle_id = name
|
|
for candidate in (normalized_icon, normalized_install_location):
|
|
if not candidate:
|
|
continue
|
|
candidate_path = Path(candidate)
|
|
if candidate_path.suffix.lower() == ".exe":
|
|
bundle_id = candidate_path.stem
|
|
break
|
|
|
|
app_path = normalized_icon or normalized_install_location or ""
|
|
if bundle_id not in results:
|
|
results[bundle_id] = {
|
|
"bundleId": bundle_id,
|
|
"displayName": str(display_name),
|
|
"path": app_path,
|
|
}
|
|
winreg.CloseKey(app_key)
|
|
finally:
|
|
winreg.CloseKey(key)
|
|
|
|
return sorted(results.values(), key=lambda item: item["displayName"].lower())
|
|
|
|
|
|
def running_apps() -> list[dict[str, Any]]:
|
|
"""List running GUI applications."""
|
|
import psutil
|
|
|
|
apps: list[dict[str, Any]] = []
|
|
seen: set[str] = set()
|
|
|
|
for proc in psutil.process_iter(["pid", "name", "exe"]):
|
|
try:
|
|
name = proc.info["name"] or ""
|
|
exe_path = proc.info["exe"] or ""
|
|
if not name or name in seen:
|
|
continue
|
|
# Skip system/background processes (no window)
|
|
if not exe_path:
|
|
continue
|
|
seen.add(name)
|
|
# Use exe name (without .exe) as bundleId
|
|
bundle_id = Path(exe_path).stem if exe_path else name
|
|
apps.append({"bundleId": bundle_id, "displayName": name})
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
continue
|
|
|
|
return sorted(apps, key=lambda item: item["displayName"].lower())
|
|
|
|
|
|
def app_display_name(bundle_id: str) -> str | None:
|
|
"""Find display name for a given bundleId (exe stem or registry key)."""
|
|
import psutil
|
|
for proc in psutil.process_iter(["name", "exe"]):
|
|
try:
|
|
exe = proc.info["exe"] or ""
|
|
if exe and Path(exe).stem == bundle_id:
|
|
return proc.info["name"]
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
continue
|
|
return None
|
|
|
|
|
|
def frontmost_app() -> dict[str, str] | None:
|
|
"""Get the currently focused (foreground) application."""
|
|
import win32gui
|
|
import win32process
|
|
import psutil
|
|
|
|
hwnd = win32gui.GetForegroundWindow()
|
|
if not hwnd:
|
|
return None
|
|
try:
|
|
_, pid = win32process.GetWindowThreadProcessId(hwnd)
|
|
proc = psutil.Process(pid)
|
|
exe_path = proc.exe()
|
|
return {
|
|
"bundleId": Path(exe_path).stem,
|
|
"displayName": proc.name(),
|
|
}
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def app_under_point(x: int, y: int) -> dict[str, str] | None:
|
|
"""Find the app whose window is under the given screen coordinate."""
|
|
import win32gui
|
|
import win32process
|
|
import psutil
|
|
|
|
hwnd = win32gui.WindowFromPoint((x, y))
|
|
if not hwnd:
|
|
return frontmost_app()
|
|
# Walk up to the top-level owner
|
|
root = win32gui.GetAncestor(hwnd, 3) # GA_ROOTOWNER = 3
|
|
if root:
|
|
hwnd = root
|
|
try:
|
|
_, pid = win32process.GetWindowThreadProcessId(hwnd)
|
|
proc = psutil.Process(pid)
|
|
exe_path = proc.exe()
|
|
return {
|
|
"bundleId": Path(exe_path).stem,
|
|
"displayName": proc.name(),
|
|
}
|
|
except Exception:
|
|
return frontmost_app()
|
|
|
|
|
|
def find_window_displays(bundle_ids: list[str]) -> list[dict[str, Any]]:
|
|
"""For each bundleId, find which display(s) its windows are on."""
|
|
if not bundle_ids:
|
|
return []
|
|
|
|
displays = get_displays()
|
|
windows = list_windows()
|
|
|
|
# Build exe-stem -> ownerName mapping
|
|
names_by_bundle: dict[str, str | None] = {}
|
|
for bid in bundle_ids:
|
|
names_by_bundle[bid] = app_display_name(bid)
|
|
|
|
result = []
|
|
for bundle_id in bundle_ids:
|
|
target_name = names_by_bundle.get(bundle_id)
|
|
display_ids: set[int] = set()
|
|
for window in windows:
|
|
owner = window["ownerName"]
|
|
if not owner:
|
|
continue
|
|
# Match by exe name
|
|
owner_stem = Path(owner).stem if owner.endswith(".exe") else owner
|
|
if target_name and owner != target_name and owner_stem != bundle_id:
|
|
continue
|
|
if not target_name and owner_stem != bundle_id and owner != bundle_id:
|
|
continue
|
|
# Check which displays this window overlaps
|
|
wx = window["bounds"]["x"]
|
|
wy = window["bounds"]["y"]
|
|
ww = window["bounds"]["width"]
|
|
wh = window["bounds"]["height"]
|
|
for display in displays:
|
|
dx = display["originX"]
|
|
dy = display["originY"]
|
|
dw = display["width"]
|
|
dh = display["height"]
|
|
# Check rectangle intersection
|
|
if wx < dx + dw and wx + ww > dx and wy < dy + dh and wy + wh > dy:
|
|
display_ids.add(int(display["displayId"]))
|
|
result.append({"bundleId": bundle_id, "displayIds": sorted(display_ids)})
|
|
return result
|
|
|
|
|
|
def open_app(bundle_id: str) -> None:
|
|
"""Open an application by its bundleId (exe path or program name)."""
|
|
# Try to find the exe path from registry
|
|
import winreg
|
|
exe_path = None
|
|
|
|
reg_paths = [
|
|
(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall"),
|
|
(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\WOW6432Node\Microsoft\Windows\CurrentVersion\Uninstall"),
|
|
(winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall"),
|
|
]
|
|
for hive, sub_key in reg_paths:
|
|
try:
|
|
key = winreg.OpenKey(hive, sub_key)
|
|
i = 0
|
|
while True:
|
|
try:
|
|
name = winreg.EnumKey(key, i)
|
|
i += 1
|
|
except OSError:
|
|
break
|
|
try:
|
|
app_key = winreg.OpenKey(key, name)
|
|
except OSError:
|
|
continue
|
|
try:
|
|
display_icon = winreg.QueryValueEx(app_key, "DisplayIcon")[0]
|
|
except OSError:
|
|
display_icon = ""
|
|
try:
|
|
install_location = winreg.QueryValueEx(app_key, "InstallLocation")[0]
|
|
except OSError:
|
|
install_location = ""
|
|
|
|
normalized_icon = str(display_icon).split(",")[0].strip().strip('"')
|
|
normalized_install_location = str(install_location).strip().strip('"')
|
|
|
|
derived_bundle_id = name
|
|
for candidate in (normalized_icon, normalized_install_location):
|
|
if not candidate:
|
|
continue
|
|
candidate_path = Path(candidate)
|
|
if candidate_path.suffix.lower() == ".exe":
|
|
derived_bundle_id = candidate_path.stem
|
|
break
|
|
|
|
if name == bundle_id or derived_bundle_id == bundle_id:
|
|
exe_path = normalized_icon or normalized_install_location or None
|
|
winreg.CloseKey(app_key)
|
|
break
|
|
winreg.CloseKey(app_key)
|
|
winreg.CloseKey(key)
|
|
if exe_path:
|
|
break
|
|
except OSError:
|
|
continue
|
|
|
|
if exe_path and Path(exe_path).exists():
|
|
os.startfile(exe_path)
|
|
else:
|
|
# Fallback: try to run it directly
|
|
try:
|
|
subprocess.Popen([bundle_id], shell=True)
|
|
except Exception:
|
|
raise RuntimeError(f"App not found for identifier: {bundle_id}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Clipboard (pyperclip — cross-platform)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def read_clipboard() -> str:
|
|
import pyperclip
|
|
try:
|
|
return pyperclip.paste() or ""
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def write_clipboard(text: str) -> None:
|
|
import pyperclip
|
|
pyperclip.copy(text)
|
|
|
|
|
|
def paste_clipboard() -> None:
|
|
pyautogui.hotkey("ctrl", "v", interval=0.02)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Permissions — Windows doesn't have macOS-style TCC
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def check_permissions() -> dict[str, bool | None]:
|
|
"""Windows does not require explicit accessibility/screen-recording
|
|
permissions like macOS TCC. Always report as granted."""
|
|
return {
|
|
"accessibility": True,
|
|
"screenRecording": True,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Input actions (pyautogui — identical to mac_helper)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def click(x: int, y: int, button: str, count: int, modifiers: list[str] | None) -> None:
|
|
pyautogui.moveTo(x, y)
|
|
if modifiers:
|
|
normalized = [normalize_key(m) for m in modifiers]
|
|
for key in normalized:
|
|
pyautogui.keyDown(key)
|
|
try:
|
|
pyautogui.click(x=x, y=y, button=button, clicks=count, interval=0.08)
|
|
finally:
|
|
for key in reversed(normalized):
|
|
pyautogui.keyUp(key)
|
|
else:
|
|
pyautogui.click(x=x, y=y, button=button, clicks=count, interval=0.08)
|
|
|
|
|
|
def scroll(x: int, y: int, delta_x: int, delta_y: int) -> None:
|
|
pyautogui.moveTo(x, y)
|
|
if delta_y:
|
|
pyautogui.scroll(int(delta_y), x=x, y=y)
|
|
if delta_x:
|
|
pyautogui.hscroll(int(delta_x), x=x, y=y)
|
|
|
|
|
|
def key_action(sequence: str, repeat: int = 1) -> None:
|
|
parts = [normalize_key(part) for part in sequence.split("+") if part.strip()]
|
|
for _ in range(max(1, repeat)):
|
|
if len(parts) == 1:
|
|
pyautogui.press(parts[0])
|
|
else:
|
|
pyautogui.hotkey(*parts, interval=0.02)
|
|
time.sleep(0.01)
|
|
|
|
|
|
def hold_keys(keys: list[str], duration_ms: int) -> None:
|
|
normalized = [normalize_key(k) for k in keys]
|
|
for key in normalized:
|
|
pyautogui.keyDown(key)
|
|
try:
|
|
time.sleep(max(duration_ms, 0) / 1000)
|
|
finally:
|
|
for key in reversed(normalized):
|
|
pyautogui.keyUp(key)
|
|
|
|
|
|
def type_text(text: str) -> None:
|
|
pyautogui.write(text, interval=0.008)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main dispatcher — exact same command protocol as mac_helper.py
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("command")
|
|
parser.add_argument("--payload", default="{}")
|
|
args = parser.parse_args()
|
|
payload = json.loads(args.payload)
|
|
|
|
try:
|
|
command = args.command
|
|
if command == "check_permissions":
|
|
perms = check_permissions()
|
|
json_output({"ok": True, "result": perms})
|
|
return 0
|
|
if command == "list_displays":
|
|
json_output({"ok": True, "result": get_displays()})
|
|
return 0
|
|
if command == "get_display_size":
|
|
json_output({"ok": True, "result": choose_display(payload.get("displayId"))})
|
|
return 0
|
|
if command == "screenshot":
|
|
resize = None
|
|
if payload.get("targetWidth") and payload.get("targetHeight"):
|
|
resize = (int(payload["targetWidth"]), int(payload["targetHeight"]))
|
|
result = capture_display(payload.get("displayId"), resize)
|
|
json_output({"ok": True, "result": result})
|
|
return 0
|
|
if command == "resolve_prepare_capture":
|
|
resize = None
|
|
if payload.get("targetWidth") and payload.get("targetHeight"):
|
|
resize = (int(payload["targetWidth"]), int(payload["targetHeight"]))
|
|
result = capture_display(payload.get("preferredDisplayId"), resize)
|
|
result["hidden"] = []
|
|
result["resolvedDisplayId"] = result["displayId"]
|
|
json_output({"ok": True, "result": result})
|
|
return 0
|
|
if command == "zoom":
|
|
resize = None
|
|
if payload.get("targetWidth") and payload.get("targetHeight"):
|
|
resize = (int(payload["targetWidth"]), int(payload["targetHeight"]))
|
|
region = {
|
|
"left": int(payload["x"]),
|
|
"top": int(payload["y"]),
|
|
"width": int(payload["width"]),
|
|
"height": int(payload["height"]),
|
|
}
|
|
json_output({"ok": True, "result": capture_region(region, resize)})
|
|
return 0
|
|
if command == "prepare_for_action":
|
|
json_output({"ok": True, "result": []})
|
|
return 0
|
|
if command == "preview_hide_set":
|
|
json_output({"ok": True, "result": []})
|
|
return 0
|
|
if command == "find_window_displays":
|
|
json_output({"ok": True, "result": find_window_displays(list(payload.get("bundleIds") or []))})
|
|
return 0
|
|
if command == "key":
|
|
key_action(str(payload["keySequence"]), int(payload.get("repeat") or 1))
|
|
json_output({"ok": True, "result": True})
|
|
return 0
|
|
if command == "hold_key":
|
|
hold_keys(list(payload.get("keyNames") or []), int(payload.get("durationMs") or 0))
|
|
json_output({"ok": True, "result": True})
|
|
return 0
|
|
if command == "type":
|
|
type_text(str(payload.get("text") or ""))
|
|
json_output({"ok": True, "result": True})
|
|
return 0
|
|
if command == "click":
|
|
click(int(payload["x"]), int(payload["y"]), str(payload.get("button") or "left"), int(payload.get("count") or 1), payload.get("modifiers"))
|
|
json_output({"ok": True, "result": True})
|
|
return 0
|
|
if command == "drag":
|
|
from_point = payload.get("from")
|
|
if from_point:
|
|
pyautogui.moveTo(int(from_point["x"]), int(from_point["y"]))
|
|
pyautogui.dragTo(int(payload["to"]["x"]), int(payload["to"]["y"]), duration=0.2, button="left")
|
|
json_output({"ok": True, "result": True})
|
|
return 0
|
|
if command == "move_mouse":
|
|
pyautogui.moveTo(int(payload["x"]), int(payload["y"]))
|
|
json_output({"ok": True, "result": True})
|
|
return 0
|
|
if command == "scroll":
|
|
scroll(int(payload["x"]), int(payload["y"]), int(payload.get("deltaX") or 0), int(payload.get("deltaY") or 0))
|
|
json_output({"ok": True, "result": True})
|
|
return 0
|
|
if command == "mouse_down":
|
|
pyautogui.mouseDown(button="left")
|
|
json_output({"ok": True, "result": True})
|
|
return 0
|
|
if command == "mouse_up":
|
|
pyautogui.mouseUp(button="left")
|
|
json_output({"ok": True, "result": True})
|
|
return 0
|
|
if command == "cursor_position":
|
|
x, y = pyautogui.position()
|
|
json_output({"ok": True, "result": {"x": int(x), "y": int(y)}})
|
|
return 0
|
|
if command == "frontmost_app":
|
|
json_output({"ok": True, "result": frontmost_app()})
|
|
return 0
|
|
if command == "app_under_point":
|
|
json_output({"ok": True, "result": app_under_point(int(payload["x"]), int(payload["y"]))})
|
|
return 0
|
|
if command == "list_installed_apps":
|
|
json_output({"ok": True, "result": installed_apps()})
|
|
return 0
|
|
if command == "list_running_apps":
|
|
json_output({"ok": True, "result": running_apps()})
|
|
return 0
|
|
if command == "open_app":
|
|
open_app(str(payload["bundleId"]))
|
|
json_output({"ok": True, "result": True})
|
|
return 0
|
|
if command == "read_clipboard":
|
|
json_output({"ok": True, "result": read_clipboard()})
|
|
return 0
|
|
if command == "write_clipboard":
|
|
write_clipboard(str(payload.get("text") or ""))
|
|
json_output({"ok": True, "result": True})
|
|
return 0
|
|
if command == "paste_clipboard":
|
|
paste_clipboard()
|
|
json_output({"ok": True, "result": True})
|
|
return 0
|
|
error_output(f"Unknown command: {command}", code="bad_command")
|
|
return 2
|
|
except Exception as exc:
|
|
error_output(str(exc))
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|