fix(personal): 修复 token 结果解析并收口恢复竞态

This commit is contained in:
genz27
2026-04-02 17:17:24 +08:00
parent 4f2c7520e1
commit 6e5d7cb9e9
2 changed files with 209 additions and 16 deletions

View File

@@ -461,6 +461,7 @@ class BrowserCaptchaService:
self._resident_pick_index = 0
self._resident_lock = asyncio.Lock() # 保护常驻标签页操作
self._browser_lock = asyncio.Lock() # 保护浏览器初始化/关闭/重启,避免重复拉起实例
self._runtime_recover_lock = asyncio.Lock() # 串行化浏览器级恢复,避免并发重启风暴
self._tab_build_lock = asyncio.Lock() # 串行化冷启动/重建,降低 nodriver 抖动
self._legacy_lock = asyncio.Lock() # 避免 legacy fallback 并发失控创建临时标签页
self._max_resident_tabs = 5 # 最大常驻标签页数量(支持并发)
@@ -489,6 +490,7 @@ class BrowserCaptchaService:
self._recaptcha_ready = False # 向后兼容
self._last_fingerprint: Optional[Dict[str, Any]] = None
self._resident_error_streaks: dict[str, int] = {}
self._last_runtime_restart_at = 0.0
self._proxy_url: Optional[str] = None
self._proxy_ext_dir: Optional[str] = None
# 自定义站点打码常驻页(用于 score-test
@@ -622,10 +624,76 @@ class BrowserCaptchaService:
self._last_health_probe_at = 0.0
self._last_health_probe_ok = False
def _mark_runtime_restart(self):
self._last_runtime_restart_at = time.time()
def _was_runtime_restarted_recently(self, window_seconds: float = 5.0) -> bool:
if self._last_runtime_restart_at <= 0.0:
return False
return (time.time() - self._last_runtime_restart_at) <= max(0.0, window_seconds)
def _is_browser_runtime_error(self, error: Any) -> bool:
"""识别浏览器运行态已损坏/已关闭的典型异常。"""
return _is_runtime_disconnect_error(error)
def _decode_nodriver_object_entries(self, value: Any) -> Optional[Dict[str, Any]]:
if not isinstance(value, list):
return None
result: Dict[str, Any] = {}
for entry in value:
if not isinstance(entry, (list, tuple)) or len(entry) != 2:
return None
key, entry_value = entry
if not isinstance(key, str):
return None
result[key] = self._normalize_nodriver_evaluate_result(entry_value)
return result
def _normalize_nodriver_evaluate_result(self, value: Any) -> Any:
if value is None:
return None
deep_serialized_value = getattr(value, "deep_serialized_value", None)
if deep_serialized_value is not None:
return self._normalize_nodriver_evaluate_result(deep_serialized_value)
type_name = getattr(value, "type_", None)
if type_name is not None and hasattr(value, "value"):
raw_value = getattr(value, "value", None)
if type_name == "object":
object_entries = self._decode_nodriver_object_entries(raw_value)
if object_entries is not None:
return object_entries
if raw_value is not None:
return self._normalize_nodriver_evaluate_result(raw_value)
unserializable_value = getattr(value, "unserializable_value", None)
if unserializable_value is not None:
return str(unserializable_value)
return value
if isinstance(value, dict):
typed_value_keys = {"type", "value", "objectId", "weakLocalObjectReference"}
if "type" in value and set(value.keys()).issubset(typed_value_keys):
raw_value = value.get("value")
if value.get("type") == "object":
object_entries = self._decode_nodriver_object_entries(raw_value)
if object_entries is not None:
return object_entries
return self._normalize_nodriver_evaluate_result(raw_value)
return {
key: self._normalize_nodriver_evaluate_result(item)
for key, item in value.items()
}
if isinstance(value, list):
object_entries = self._decode_nodriver_object_entries(value)
if object_entries is not None:
return object_entries
return [self._normalize_nodriver_evaluate_result(item) for item in value]
return value
async def _probe_browser_runtime(self) -> bool:
"""轻量探测当前 nodriver 连接是否仍可用。"""
if not self.browser:
@@ -651,24 +719,38 @@ class BrowserCaptchaService:
async def _recover_browser_runtime(self, project_id: Optional[str] = None, reason: str = "runtime_error") -> bool:
"""浏览器运行态损坏时,优先整颗浏览器重启并恢复 resident 池。"""
normalized_project_id = str(project_id or "").strip()
self._invalidate_browser_health()
async with self._runtime_recover_lock:
if self.browser and self._initialized and not getattr(self.browser, "stopped", False):
try:
if await self._probe_browser_runtime():
debug_logger.log_info(
f"[BrowserCaptcha] 浏览器运行态已被并发协程恢复,直接复用 (project_id={normalized_project_id or '<empty>'}, reason={reason})"
)
return True
except Exception:
pass
self._invalidate_browser_health()
if normalized_project_id:
try:
if await self._restart_browser_for_project_unlocked(normalized_project_id):
self._mark_runtime_restart()
return True
except Exception as e:
debug_logger.log_warning(
f"[BrowserCaptcha] 浏览器重启恢复失败 (project_id={normalized_project_id}, reason={reason}): {e}"
)
if normalized_project_id:
try:
if await self._restart_browser_for_project(normalized_project_id):
return True
await self._shutdown_browser_runtime(cancel_idle_reaper=False, reason=f"recover:{reason}")
await self.initialize()
self._mark_runtime_restart()
return True
except Exception as e:
debug_logger.log_warning(
f"[BrowserCaptcha] 浏览器重启恢复失败 (project_id={normalized_project_id}, reason={reason}): {e}"
)
debug_logger.log_error(f"[BrowserCaptcha] 浏览器运行态恢复失败 ({reason}): {e}")
return False
try:
await self._shutdown_browser_runtime(cancel_idle_reaper=False, reason=f"recover:{reason}")
await self.initialize()
return True
except Exception as e:
debug_logger.log_error(f"[BrowserCaptcha] 浏览器运行态恢复失败 ({reason}): {e}")
return False
async def _tab_evaluate(
self,
tab,
@@ -679,7 +761,7 @@ class BrowserCaptchaService:
await_promise: bool = False,
return_by_value: bool = False,
):
return await self._run_with_timeout(
result = await self._run_with_timeout(
tab.evaluate(
script,
await_promise=await_promise,
@@ -688,6 +770,9 @@ class BrowserCaptchaService:
timeout_seconds or self._command_timeout_seconds,
label,
)
if return_by_value:
return self._normalize_nodriver_evaluate_result(result)
return result
async def _tab_get(self, tab, url: str, label: str, timeout_seconds: Optional[float] = None):
return await self._run_with_timeout(
@@ -1605,6 +1690,29 @@ class BrowserCaptchaService:
return True
async def _restart_browser_for_project(self, project_id: str) -> bool:
async with self._runtime_recover_lock:
if self._was_runtime_restarted_recently():
try:
if await self._probe_browser_runtime():
slot_id, resident_info = await self._ensure_resident_tab(project_id, return_slot_key=True)
if resident_info is not None and slot_id:
self._remember_project_affinity(project_id, slot_id, resident_info)
self._resident_error_streaks.pop(slot_id, None)
debug_logger.log_warning(
f"[BrowserCaptcha] project_id={project_id} 检测到最近已完成浏览器恢复,复用当前运行态 (slot={slot_id})"
)
return True
except Exception as e:
debug_logger.log_warning(
f"[BrowserCaptcha] project_id={project_id} 复用最近恢复运行态失败,继续执行整浏览器重启: {e}"
)
restarted = await self._restart_browser_for_project_unlocked(project_id)
if restarted:
self._mark_runtime_restart()
return restarted
async def _restart_browser_for_project_unlocked(self, project_id: str) -> bool:
"""重启整个 nodriver 浏览器,并恢复共享打码池。"""
async with self._resident_lock:
restore_slots = max(1, min(self._max_resident_tabs, len(self._resident_tabs) or 1))
@@ -2375,7 +2483,14 @@ class BrowserCaptchaService:
existing_tabs = [info.tab for info in self._resident_tabs.values() if info.tab]
# 获取或创建标签页
tabs = self.browser.tabs
browser = self.browser
if browser is None or getattr(browser, "stopped", False):
debug_logger.log_warning(
f"[BrowserCaptcha] 创建共享常驻标签页前浏览器不可用 (slot={slot_id}, project={project_id})"
)
return None
tabs = list(getattr(browser, "tabs", []) or [])
available_tab = None
# 查找未被占用的标签页

View File

@@ -0,0 +1,78 @@
import types
import unittest
from unittest.mock import AsyncMock
from src.services.browser_captcha_personal import BrowserCaptchaService, ResidentTabInfo
class _FakeTab:
def __init__(self, result):
self._result = result
async def evaluate(self, expression, await_promise=False, return_by_value=False):
return self._result
class BrowserCaptchaPersonalTests(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self.service = BrowserCaptchaService()
@staticmethod
def _make_remote_object_result(token: str):
return types.SimpleNamespace(
type_="object",
value=None,
deep_serialized_value=types.SimpleNamespace(
type_="object",
value=[
["ok", {"type": "boolean", "value": True}],
["token", {"type": "string", "value": token}],
],
),
)
async def test_tab_evaluate_normalizes_deep_serialized_remote_object(self):
tab = _FakeTab(self._make_remote_object_result("token-123"))
result = await self.service._tab_evaluate(
tab,
"ignored",
label="unit_test_tab_evaluate",
await_promise=True,
return_by_value=True,
)
self.assertEqual(result, {"ok": True, "token": "token-123"})
async def test_execute_recaptcha_on_tab_accepts_remote_object_success_result(self):
tab = _FakeTab(self._make_remote_object_result("token-xyz"))
token = await self.service._execute_recaptcha_on_tab(tab, action="IMAGE_GENERATION")
self.assertEqual(token, "token-xyz")
async def test_create_resident_tab_returns_none_when_browser_missing(self):
self.service.browser = None
resident_info = await self.service._create_resident_tab("slot-1", project_id="project-1")
self.assertIsNone(resident_info)
async def test_restart_browser_for_project_reuses_recent_healthy_runtime(self):
resident_info = ResidentTabInfo(tab=object(), slot_id="slot-1", project_id="project-1")
self.service.browser = types.SimpleNamespace(stopped=False)
self.service._initialized = True
self.service._mark_runtime_restart()
self.service._probe_browser_runtime = AsyncMock(return_value=True)
self.service._ensure_resident_tab = AsyncMock(return_value=("slot-1", resident_info))
self.service._restart_browser_for_project_unlocked = AsyncMock(return_value=True)
result = await self.service._restart_browser_for_project("project-1")
self.assertTrue(result)
self.service._restart_browser_for_project_unlocked.assert_not_awaited()
self.service._ensure_resident_tab.assert_awaited_once()
if __name__ == "__main__":
unittest.main()