From 4b259331e5f52bce1fc519bd14c4efd4d3f6ec02 Mon Sep 17 00:00:00 2001 From: genz27 Date: Thu, 2 Apr 2026 15:16:59 +0800 Subject: [PATCH] =?UTF-8?q?fix(personal):=20=E6=94=B6=E5=8F=A3=20nodriver?= =?UTF-8?q?=20=E6=96=AD=E8=BF=9E=E5=B9=B6=E9=99=8D=E4=BD=8E=E5=BC=82?= =?UTF-8?q?=E5=B8=B8=E5=99=AA=E9=9F=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/services/browser_captcha_personal.py | 237 +++++++++++++++++++++-- 1 file changed, 217 insertions(+), 20 deletions(-) diff --git a/src/services/browser_captcha_personal.py b/src/services/browser_captcha_personal.py index 2f599c1..3271eae 100644 --- a/src/services/browser_captcha_personal.py +++ b/src/services/browser_captcha_personal.py @@ -13,6 +13,7 @@ import json import shutil import tempfile import subprocess +import types from typing import Optional, Dict, Any, Iterable from ..core.logger import debug_logger @@ -120,6 +121,7 @@ def _ensure_nodriver_installed() -> bool: # 尝试导入 nodriver uc = None NODRIVER_AVAILABLE = False +_NODRIVER_RUNTIME_PATCHED = False if DOCKER_HEADED_BLOCKED: debug_logger.log_warning( @@ -143,6 +145,184 @@ else: print(f"[BrowserCaptcha] ❌ nodriver 导入失败: {e}") +_RUNTIME_ERROR_KEYWORDS = ( + "has been closed", + "browser has been closed", + "target closed", + "connection closed", + "connection lost", + "connection refused", + "connection reset", + "broken pipe", + "session closed", + "not attached to an active page", + "no session with given id", + "cannot find context with specified id", + "websocket is not open", + "no close frame received or sent", + "cannot call write to closing transport", + "cannot write to closing transport", + "cannot call send once a close message has been sent", + "connectionclosederror", + "connectionrefusederror", + "disconnected", + "errno 111", +) + + +def _flatten_exception_text(error: Any) -> str: + """拼接异常链文本,便于统一识别 nodriver 运行态断连。""" + visited: set[int] = set() + pending = [error] + parts: list[str] = [] + + while pending: + current = pending.pop() + if current is None: + continue + + current_id = id(current) + if current_id in visited: + continue + visited.add(current_id) + + parts.append(type(current).__name__) + + message = str(current or "").strip() + if message: + parts.append(message) + + args = getattr(current, "args", None) + if isinstance(args, tuple): + for arg in args: + arg_text = str(arg or "").strip() + if arg_text: + parts.append(arg_text) + + pending.append(getattr(current, "__cause__", None)) + pending.append(getattr(current, "__context__", None)) + + return " | ".join(parts).lower() + + +def _is_runtime_disconnect_error(error: Any) -> bool: + """识别浏览器 / websocket 运行态断连。""" + error_text = _flatten_exception_text(error) + if not error_text: + return False + return any(keyword in error_text for keyword in _RUNTIME_ERROR_KEYWORDS) + + +def _finalize_nodriver_send_task(connection, transaction, tx_id: int, task: asyncio.Task): + """回收 nodriver websocket.send 的后台异常,避免事件循环打印未检索 task 错误。""" + try: + task.result() + except asyncio.CancelledError: + connection.mapper.pop(tx_id, None) + if not transaction.done(): + transaction.cancel() + except Exception as e: + connection.mapper.pop(tx_id, None) + if not transaction.done(): + try: + transaction.set_exception(e) + except Exception: + pass + + if _is_runtime_disconnect_error(e): + debug_logger.log_warning( + f"[BrowserCaptcha] nodriver websocket 发送在断连后退出: {type(e).__name__}: {e}" + ) + else: + debug_logger.log_warning( + f"[BrowserCaptcha] nodriver websocket 发送异常: {type(e).__name__}: {e}" + ) + + +def _patch_nodriver_connection_instance(connection_instance): + """在连接实例级别收口 websocket.send 的后台异常。""" + if not connection_instance or getattr(connection_instance, "_flow2api_send_patched", False): + return + + try: + from nodriver.core import connection as nodriver_connection_module + except Exception as e: + debug_logger.log_warning(f"[BrowserCaptcha] 加载 nodriver.connection 失败,跳过连接补丁: {e}") + return + + async def patched_send(self, cdp_obj, _is_update=False): + if self.closed: + await self.connect() + if not _is_update: + await self._register_handlers() + + transaction = nodriver_connection_module.Transaction(cdp_obj) + tx_id = next(self.__count__) + transaction.id = tx_id + self.mapper[tx_id] = transaction + + send_task = asyncio.create_task(self.websocket.send(transaction.message)) + send_task.add_done_callback( + lambda task, connection=self, tx=transaction, current_tx_id=tx_id: + _finalize_nodriver_send_task(connection, tx, current_tx_id, task) + ) + return await transaction + + connection_instance.send = types.MethodType(patched_send, connection_instance) + connection_instance._flow2api_send_patched = True + + +def _patch_nodriver_browser_instance(browser_instance): + """在浏览器实例级别收口 update_targets,并补齐新 target 的连接补丁。""" + if not browser_instance: + return + + _patch_nodriver_connection_instance(getattr(browser_instance, "connection", None)) + for target in list(getattr(browser_instance, "targets", []) or []): + _patch_nodriver_connection_instance(target) + + if getattr(browser_instance, "_flow2api_update_targets_patched", False): + return + + original_update_targets = browser_instance.update_targets + + async def patched_update_targets(self, *args, **kwargs): + try: + result = await original_update_targets(*args, **kwargs) + except asyncio.CancelledError: + raise + except Exception as e: + if _is_runtime_disconnect_error(e): + debug_logger.log_warning( + f"[BrowserCaptcha] nodriver.update_targets 在浏览器断连后退出: {type(e).__name__}: {e}" + ) + return [] + raise + + _patch_nodriver_connection_instance(getattr(self, "connection", None)) + for target in list(getattr(self, "targets", []) or []): + _patch_nodriver_connection_instance(target) + return result + + browser_instance.update_targets = types.MethodType(patched_update_targets, browser_instance) + browser_instance._flow2api_update_targets_patched = True + + +def _patch_nodriver_runtime(browser_instance=None): + """给 nodriver 当前浏览器实例补一层断连降噪与异常透传。""" + global _NODRIVER_RUNTIME_PATCHED + + if not NODRIVER_AVAILABLE or uc is None: + return + + if browser_instance is not None: + _patch_nodriver_browser_instance(browser_instance) + + if not _NODRIVER_RUNTIME_PATCHED: + _NODRIVER_RUNTIME_PATCHED = True + debug_logger.log_info("[BrowserCaptcha] 已启用 nodriver 运行态安全补丁") + + def _parse_proxy_url(proxy_url: str): """Parse a proxy URL into (protocol, host, port, username, password).""" if not proxy_url: @@ -351,24 +531,7 @@ class BrowserCaptchaService: def _is_browser_runtime_error(self, error: Any) -> bool: """识别浏览器运行态已损坏/已关闭的典型异常。""" - error_text = str(error or "").strip().lower() - if not error_text: - return False - - runtime_keywords = [ - "has been closed", - "browser has been closed", - "target closed", - "connection closed", - "connection lost", - "session closed", - "not attached to an active page", - "no session with given id", - "cannot find context with specified id", - "websocket is not open", - "disconnected", - ] - return any(keyword in error_text for keyword in runtime_keywords) + return _is_runtime_disconnect_error(error) async def _probe_browser_runtime(self) -> bool: """轻量探测当前 nodriver 连接是否仍可用。""" @@ -746,10 +909,42 @@ class BrowserCaptchaService: except Exception: pass - async def _stop_browser_process(self, browser_instance): + async def _disconnect_browser_connection_quietly(self, browser_instance, reason: str): + """尽量先关闭 DevTools websocket,减少 nodriver 后台任务在浏览器退场时炸栈。""" + if not browser_instance: + return + + connection = getattr(browser_instance, "connection", None) + disconnect_method = getattr(connection, "disconnect", None) if connection else None + if disconnect_method is None: + return + + try: + result = disconnect_method() + if inspect.isawaitable(result): + await self._run_with_timeout( + result, + timeout_seconds=5.0, + label=f"browser.disconnect:{reason}", + ) + await asyncio.sleep(0) + except Exception as e: + if self._is_browser_runtime_error(e): + debug_logger.log_warning( + f"[BrowserCaptcha] 浏览器连接关闭时检测到已断连状态 ({reason}): {e}" + ) + return + debug_logger.log_warning( + f"[BrowserCaptcha] 浏览器连接关闭异常 ({reason}): {type(e).__name__}: {e}" + ) + + async def _stop_browser_process(self, browser_instance, reason: str = "browser_stop"): """兼容 nodriver 同步 stop API,安全停止浏览器进程。""" if not browser_instance: return + + await self._disconnect_browser_connection_quietly(browser_instance, reason=reason) + stop_method = getattr(browser_instance, "stop", None) if stop_method is None: return @@ -800,7 +995,7 @@ class BrowserCaptchaService: if browser_instance: try: - await self._stop_browser_process(browser_instance) + await self._stop_browser_process(browser_instance, reason=reason) except Exception as e: debug_logger.log_warning( f"[BrowserCaptcha] 停止浏览器实例失败 ({reason}): {e}" @@ -859,6 +1054,7 @@ class BrowserCaptchaService: debug_logger.log_warning("[BrowserCaptcha] 浏览器连接已失活,准备重新初始化...") browser_needs_restart = True else: + _patch_nodriver_runtime(self.browser) if self._idle_reaper_task is None or self._idle_reaper_task.done(): self._idle_reaper_task = asyncio.create_task(self._idle_tab_reaper_loop()) return @@ -1009,6 +1205,7 @@ class BrowserCaptchaService: label="nodriver.start.retry_no_sandbox", ) + _patch_nodriver_runtime(self.browser) self._initialized = True if self._idle_reaper_task is None or self._idle_reaper_task.done(): self._idle_reaper_task = asyncio.create_task(self._idle_tab_reaper_loop())