fix(personal): 收口 nodriver 断连并降低异常噪音

This commit is contained in:
genz27
2026-04-02 15:16:59 +08:00
parent 5702e1e182
commit 4b259331e5

View File

@@ -13,6 +13,7 @@ import json
import shutil
import tempfile
import subprocess
import types
from typing import Optional, Dict, Any, Iterable
from ..core.logger import debug_logger
@@ -120,6 +121,7 @@ def _ensure_nodriver_installed() -> bool:
# 尝试导入 nodriver
uc = None
NODRIVER_AVAILABLE = False
_NODRIVER_RUNTIME_PATCHED = False
if DOCKER_HEADED_BLOCKED:
debug_logger.log_warning(
@@ -143,6 +145,184 @@ else:
print(f"[BrowserCaptcha] ❌ nodriver 导入失败: {e}")
_RUNTIME_ERROR_KEYWORDS = (
"has been closed",
"browser has been closed",
"target closed",
"connection closed",
"connection lost",
"connection refused",
"connection reset",
"broken pipe",
"session closed",
"not attached to an active page",
"no session with given id",
"cannot find context with specified id",
"websocket is not open",
"no close frame received or sent",
"cannot call write to closing transport",
"cannot write to closing transport",
"cannot call send once a close message has been sent",
"connectionclosederror",
"connectionrefusederror",
"disconnected",
"errno 111",
)
def _flatten_exception_text(error: Any) -> str:
"""拼接异常链文本,便于统一识别 nodriver 运行态断连。"""
visited: set[int] = set()
pending = [error]
parts: list[str] = []
while pending:
current = pending.pop()
if current is None:
continue
current_id = id(current)
if current_id in visited:
continue
visited.add(current_id)
parts.append(type(current).__name__)
message = str(current or "").strip()
if message:
parts.append(message)
args = getattr(current, "args", None)
if isinstance(args, tuple):
for arg in args:
arg_text = str(arg or "").strip()
if arg_text:
parts.append(arg_text)
pending.append(getattr(current, "__cause__", None))
pending.append(getattr(current, "__context__", None))
return " | ".join(parts).lower()
def _is_runtime_disconnect_error(error: Any) -> bool:
"""识别浏览器 / websocket 运行态断连。"""
error_text = _flatten_exception_text(error)
if not error_text:
return False
return any(keyword in error_text for keyword in _RUNTIME_ERROR_KEYWORDS)
def _finalize_nodriver_send_task(connection, transaction, tx_id: int, task: asyncio.Task):
"""回收 nodriver websocket.send 的后台异常,避免事件循环打印未检索 task 错误。"""
try:
task.result()
except asyncio.CancelledError:
connection.mapper.pop(tx_id, None)
if not transaction.done():
transaction.cancel()
except Exception as e:
connection.mapper.pop(tx_id, None)
if not transaction.done():
try:
transaction.set_exception(e)
except Exception:
pass
if _is_runtime_disconnect_error(e):
debug_logger.log_warning(
f"[BrowserCaptcha] nodriver websocket 发送在断连后退出: {type(e).__name__}: {e}"
)
else:
debug_logger.log_warning(
f"[BrowserCaptcha] nodriver websocket 发送异常: {type(e).__name__}: {e}"
)
def _patch_nodriver_connection_instance(connection_instance):
"""在连接实例级别收口 websocket.send 的后台异常。"""
if not connection_instance or getattr(connection_instance, "_flow2api_send_patched", False):
return
try:
from nodriver.core import connection as nodriver_connection_module
except Exception as e:
debug_logger.log_warning(f"[BrowserCaptcha] 加载 nodriver.connection 失败,跳过连接补丁: {e}")
return
async def patched_send(self, cdp_obj, _is_update=False):
if self.closed:
await self.connect()
if not _is_update:
await self._register_handlers()
transaction = nodriver_connection_module.Transaction(cdp_obj)
tx_id = next(self.__count__)
transaction.id = tx_id
self.mapper[tx_id] = transaction
send_task = asyncio.create_task(self.websocket.send(transaction.message))
send_task.add_done_callback(
lambda task, connection=self, tx=transaction, current_tx_id=tx_id:
_finalize_nodriver_send_task(connection, tx, current_tx_id, task)
)
return await transaction
connection_instance.send = types.MethodType(patched_send, connection_instance)
connection_instance._flow2api_send_patched = True
def _patch_nodriver_browser_instance(browser_instance):
"""在浏览器实例级别收口 update_targets并补齐新 target 的连接补丁。"""
if not browser_instance:
return
_patch_nodriver_connection_instance(getattr(browser_instance, "connection", None))
for target in list(getattr(browser_instance, "targets", []) or []):
_patch_nodriver_connection_instance(target)
if getattr(browser_instance, "_flow2api_update_targets_patched", False):
return
original_update_targets = browser_instance.update_targets
async def patched_update_targets(self, *args, **kwargs):
try:
result = await original_update_targets(*args, **kwargs)
except asyncio.CancelledError:
raise
except Exception as e:
if _is_runtime_disconnect_error(e):
debug_logger.log_warning(
f"[BrowserCaptcha] nodriver.update_targets 在浏览器断连后退出: {type(e).__name__}: {e}"
)
return []
raise
_patch_nodriver_connection_instance(getattr(self, "connection", None))
for target in list(getattr(self, "targets", []) or []):
_patch_nodriver_connection_instance(target)
return result
browser_instance.update_targets = types.MethodType(patched_update_targets, browser_instance)
browser_instance._flow2api_update_targets_patched = True
def _patch_nodriver_runtime(browser_instance=None):
"""给 nodriver 当前浏览器实例补一层断连降噪与异常透传。"""
global _NODRIVER_RUNTIME_PATCHED
if not NODRIVER_AVAILABLE or uc is None:
return
if browser_instance is not None:
_patch_nodriver_browser_instance(browser_instance)
if not _NODRIVER_RUNTIME_PATCHED:
_NODRIVER_RUNTIME_PATCHED = True
debug_logger.log_info("[BrowserCaptcha] 已启用 nodriver 运行态安全补丁")
def _parse_proxy_url(proxy_url: str):
"""Parse a proxy URL into (protocol, host, port, username, password)."""
if not proxy_url:
@@ -351,24 +531,7 @@ class BrowserCaptchaService:
def _is_browser_runtime_error(self, error: Any) -> bool:
"""识别浏览器运行态已损坏/已关闭的典型异常。"""
error_text = str(error or "").strip().lower()
if not error_text:
return False
runtime_keywords = [
"has been closed",
"browser has been closed",
"target closed",
"connection closed",
"connection lost",
"session closed",
"not attached to an active page",
"no session with given id",
"cannot find context with specified id",
"websocket is not open",
"disconnected",
]
return any(keyword in error_text for keyword in runtime_keywords)
return _is_runtime_disconnect_error(error)
async def _probe_browser_runtime(self) -> bool:
"""轻量探测当前 nodriver 连接是否仍可用。"""
@@ -746,10 +909,42 @@ class BrowserCaptchaService:
except Exception:
pass
async def _stop_browser_process(self, browser_instance):
async def _disconnect_browser_connection_quietly(self, browser_instance, reason: str):
"""尽量先关闭 DevTools websocket减少 nodriver 后台任务在浏览器退场时炸栈。"""
if not browser_instance:
return
connection = getattr(browser_instance, "connection", None)
disconnect_method = getattr(connection, "disconnect", None) if connection else None
if disconnect_method is None:
return
try:
result = disconnect_method()
if inspect.isawaitable(result):
await self._run_with_timeout(
result,
timeout_seconds=5.0,
label=f"browser.disconnect:{reason}",
)
await asyncio.sleep(0)
except Exception as e:
if self._is_browser_runtime_error(e):
debug_logger.log_warning(
f"[BrowserCaptcha] 浏览器连接关闭时检测到已断连状态 ({reason}): {e}"
)
return
debug_logger.log_warning(
f"[BrowserCaptcha] 浏览器连接关闭异常 ({reason}): {type(e).__name__}: {e}"
)
async def _stop_browser_process(self, browser_instance, reason: str = "browser_stop"):
"""兼容 nodriver 同步 stop API安全停止浏览器进程。"""
if not browser_instance:
return
await self._disconnect_browser_connection_quietly(browser_instance, reason=reason)
stop_method = getattr(browser_instance, "stop", None)
if stop_method is None:
return
@@ -800,7 +995,7 @@ class BrowserCaptchaService:
if browser_instance:
try:
await self._stop_browser_process(browser_instance)
await self._stop_browser_process(browser_instance, reason=reason)
except Exception as e:
debug_logger.log_warning(
f"[BrowserCaptcha] 停止浏览器实例失败 ({reason}): {e}"
@@ -859,6 +1054,7 @@ class BrowserCaptchaService:
debug_logger.log_warning("[BrowserCaptcha] 浏览器连接已失活,准备重新初始化...")
browser_needs_restart = True
else:
_patch_nodriver_runtime(self.browser)
if self._idle_reaper_task is None or self._idle_reaper_task.done():
self._idle_reaper_task = asyncio.create_task(self._idle_tab_reaper_loop())
return
@@ -1009,6 +1205,7 @@ class BrowserCaptchaService:
label="nodriver.start.retry_no_sandbox",
)
_patch_nodriver_runtime(self.browser)
self._initialized = True
if self._idle_reaper_task is None or self._idle_reaper_task.done():
self._idle_reaper_task = asyncio.create_task(self._idle_tab_reaper_loop())