From 6e5d7cb9e9c0ba11fdfa488b51cbbe03ea6d9312 Mon Sep 17 00:00:00 2001 From: genz27 Date: Thu, 2 Apr 2026 17:17:24 +0800 Subject: [PATCH] =?UTF-8?q?fix(personal):=20=E4=BF=AE=E5=A4=8D=20token=20?= =?UTF-8?q?=E7=BB=93=E6=9E=9C=E8=A7=A3=E6=9E=90=E5=B9=B6=E6=94=B6=E5=8F=A3?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E7=AB=9E=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/services/browser_captcha_personal.py | 147 ++++++++++++++++++++--- tests/test_browser_captcha_personal.py | 78 ++++++++++++ 2 files changed, 209 insertions(+), 16 deletions(-) create mode 100644 tests/test_browser_captcha_personal.py diff --git a/src/services/browser_captcha_personal.py b/src/services/browser_captcha_personal.py index 500e874..20d24c1 100644 --- a/src/services/browser_captcha_personal.py +++ b/src/services/browser_captcha_personal.py @@ -461,6 +461,7 @@ class BrowserCaptchaService: self._resident_pick_index = 0 self._resident_lock = asyncio.Lock() # 保护常驻标签页操作 self._browser_lock = asyncio.Lock() # 保护浏览器初始化/关闭/重启,避免重复拉起实例 + self._runtime_recover_lock = asyncio.Lock() # 串行化浏览器级恢复,避免并发重启风暴 self._tab_build_lock = asyncio.Lock() # 串行化冷启动/重建,降低 nodriver 抖动 self._legacy_lock = asyncio.Lock() # 避免 legacy fallback 并发失控创建临时标签页 self._max_resident_tabs = 5 # 最大常驻标签页数量(支持并发) @@ -489,6 +490,7 @@ class BrowserCaptchaService: self._recaptcha_ready = False # 向后兼容 self._last_fingerprint: Optional[Dict[str, Any]] = None self._resident_error_streaks: dict[str, int] = {} + self._last_runtime_restart_at = 0.0 self._proxy_url: Optional[str] = None self._proxy_ext_dir: Optional[str] = None # 自定义站点打码常驻页(用于 score-test) @@ -622,10 +624,76 @@ class BrowserCaptchaService: self._last_health_probe_at = 0.0 self._last_health_probe_ok = False + def _mark_runtime_restart(self): + self._last_runtime_restart_at = time.time() + + def _was_runtime_restarted_recently(self, window_seconds: float = 5.0) -> bool: + if self._last_runtime_restart_at <= 0.0: + return False + return (time.time() - self._last_runtime_restart_at) <= max(0.0, window_seconds) + def _is_browser_runtime_error(self, error: Any) -> bool: """识别浏览器运行态已损坏/已关闭的典型异常。""" return _is_runtime_disconnect_error(error) + def _decode_nodriver_object_entries(self, value: Any) -> Optional[Dict[str, Any]]: + if not isinstance(value, list): + return None + + result: Dict[str, Any] = {} + for entry in value: + if not isinstance(entry, (list, tuple)) or len(entry) != 2: + return None + key, entry_value = entry + if not isinstance(key, str): + return None + result[key] = self._normalize_nodriver_evaluate_result(entry_value) + return result + + def _normalize_nodriver_evaluate_result(self, value: Any) -> Any: + if value is None: + return None + + deep_serialized_value = getattr(value, "deep_serialized_value", None) + if deep_serialized_value is not None: + return self._normalize_nodriver_evaluate_result(deep_serialized_value) + + type_name = getattr(value, "type_", None) + if type_name is not None and hasattr(value, "value"): + raw_value = getattr(value, "value", None) + if type_name == "object": + object_entries = self._decode_nodriver_object_entries(raw_value) + if object_entries is not None: + return object_entries + if raw_value is not None: + return self._normalize_nodriver_evaluate_result(raw_value) + unserializable_value = getattr(value, "unserializable_value", None) + if unserializable_value is not None: + return str(unserializable_value) + return value + + if isinstance(value, dict): + typed_value_keys = {"type", "value", "objectId", "weakLocalObjectReference"} + if "type" in value and set(value.keys()).issubset(typed_value_keys): + raw_value = value.get("value") + if value.get("type") == "object": + object_entries = self._decode_nodriver_object_entries(raw_value) + if object_entries is not None: + return object_entries + return self._normalize_nodriver_evaluate_result(raw_value) + return { + key: self._normalize_nodriver_evaluate_result(item) + for key, item in value.items() + } + + if isinstance(value, list): + object_entries = self._decode_nodriver_object_entries(value) + if object_entries is not None: + return object_entries + return [self._normalize_nodriver_evaluate_result(item) for item in value] + + return value + async def _probe_browser_runtime(self) -> bool: """轻量探测当前 nodriver 连接是否仍可用。""" if not self.browser: @@ -651,24 +719,38 @@ class BrowserCaptchaService: async def _recover_browser_runtime(self, project_id: Optional[str] = None, reason: str = "runtime_error") -> bool: """浏览器运行态损坏时,优先整颗浏览器重启并恢复 resident 池。""" normalized_project_id = str(project_id or "").strip() - self._invalidate_browser_health() + async with self._runtime_recover_lock: + if self.browser and self._initialized and not getattr(self.browser, "stopped", False): + try: + if await self._probe_browser_runtime(): + debug_logger.log_info( + f"[BrowserCaptcha] 浏览器运行态已被并发协程恢复,直接复用 (project_id={normalized_project_id or ''}, reason={reason})" + ) + return True + except Exception: + pass + + self._invalidate_browser_health() + + if normalized_project_id: + try: + if await self._restart_browser_for_project_unlocked(normalized_project_id): + self._mark_runtime_restart() + return True + except Exception as e: + debug_logger.log_warning( + f"[BrowserCaptcha] 浏览器重启恢复失败 (project_id={normalized_project_id}, reason={reason}): {e}" + ) - if normalized_project_id: try: - if await self._restart_browser_for_project(normalized_project_id): - return True + await self._shutdown_browser_runtime(cancel_idle_reaper=False, reason=f"recover:{reason}") + await self.initialize() + self._mark_runtime_restart() + return True except Exception as e: - debug_logger.log_warning( - f"[BrowserCaptcha] 浏览器重启恢复失败 (project_id={normalized_project_id}, reason={reason}): {e}" - ) + debug_logger.log_error(f"[BrowserCaptcha] 浏览器运行态恢复失败 ({reason}): {e}") + return False - try: - await self._shutdown_browser_runtime(cancel_idle_reaper=False, reason=f"recover:{reason}") - await self.initialize() - return True - except Exception as e: - debug_logger.log_error(f"[BrowserCaptcha] 浏览器运行态恢复失败 ({reason}): {e}") - return False async def _tab_evaluate( self, tab, @@ -679,7 +761,7 @@ class BrowserCaptchaService: await_promise: bool = False, return_by_value: bool = False, ): - return await self._run_with_timeout( + result = await self._run_with_timeout( tab.evaluate( script, await_promise=await_promise, @@ -688,6 +770,9 @@ class BrowserCaptchaService: timeout_seconds or self._command_timeout_seconds, label, ) + if return_by_value: + return self._normalize_nodriver_evaluate_result(result) + return result async def _tab_get(self, tab, url: str, label: str, timeout_seconds: Optional[float] = None): return await self._run_with_timeout( @@ -1605,6 +1690,29 @@ class BrowserCaptchaService: return True async def _restart_browser_for_project(self, project_id: str) -> bool: + async with self._runtime_recover_lock: + if self._was_runtime_restarted_recently(): + try: + if await self._probe_browser_runtime(): + slot_id, resident_info = await self._ensure_resident_tab(project_id, return_slot_key=True) + if resident_info is not None and slot_id: + self._remember_project_affinity(project_id, slot_id, resident_info) + self._resident_error_streaks.pop(slot_id, None) + debug_logger.log_warning( + f"[BrowserCaptcha] project_id={project_id} 检测到最近已完成浏览器恢复,复用当前运行态 (slot={slot_id})" + ) + return True + except Exception as e: + debug_logger.log_warning( + f"[BrowserCaptcha] project_id={project_id} 复用最近恢复运行态失败,继续执行整浏览器重启: {e}" + ) + + restarted = await self._restart_browser_for_project_unlocked(project_id) + if restarted: + self._mark_runtime_restart() + return restarted + + async def _restart_browser_for_project_unlocked(self, project_id: str) -> bool: """重启整个 nodriver 浏览器,并恢复共享打码池。""" async with self._resident_lock: restore_slots = max(1, min(self._max_resident_tabs, len(self._resident_tabs) or 1)) @@ -2375,7 +2483,14 @@ class BrowserCaptchaService: existing_tabs = [info.tab for info in self._resident_tabs.values() if info.tab] # 获取或创建标签页 - tabs = self.browser.tabs + browser = self.browser + if browser is None or getattr(browser, "stopped", False): + debug_logger.log_warning( + f"[BrowserCaptcha] 创建共享常驻标签页前浏览器不可用 (slot={slot_id}, project={project_id})" + ) + return None + + tabs = list(getattr(browser, "tabs", []) or []) available_tab = None # 查找未被占用的标签页 diff --git a/tests/test_browser_captcha_personal.py b/tests/test_browser_captcha_personal.py new file mode 100644 index 0000000..5ec6a85 --- /dev/null +++ b/tests/test_browser_captcha_personal.py @@ -0,0 +1,78 @@ +import types +import unittest +from unittest.mock import AsyncMock + +from src.services.browser_captcha_personal import BrowserCaptchaService, ResidentTabInfo + + +class _FakeTab: + def __init__(self, result): + self._result = result + + async def evaluate(self, expression, await_promise=False, return_by_value=False): + return self._result + + +class BrowserCaptchaPersonalTests(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.service = BrowserCaptchaService() + + @staticmethod + def _make_remote_object_result(token: str): + return types.SimpleNamespace( + type_="object", + value=None, + deep_serialized_value=types.SimpleNamespace( + type_="object", + value=[ + ["ok", {"type": "boolean", "value": True}], + ["token", {"type": "string", "value": token}], + ], + ), + ) + + async def test_tab_evaluate_normalizes_deep_serialized_remote_object(self): + tab = _FakeTab(self._make_remote_object_result("token-123")) + + result = await self.service._tab_evaluate( + tab, + "ignored", + label="unit_test_tab_evaluate", + await_promise=True, + return_by_value=True, + ) + + self.assertEqual(result, {"ok": True, "token": "token-123"}) + + async def test_execute_recaptcha_on_tab_accepts_remote_object_success_result(self): + tab = _FakeTab(self._make_remote_object_result("token-xyz")) + + token = await self.service._execute_recaptcha_on_tab(tab, action="IMAGE_GENERATION") + + self.assertEqual(token, "token-xyz") + + async def test_create_resident_tab_returns_none_when_browser_missing(self): + self.service.browser = None + + resident_info = await self.service._create_resident_tab("slot-1", project_id="project-1") + + self.assertIsNone(resident_info) + + async def test_restart_browser_for_project_reuses_recent_healthy_runtime(self): + resident_info = ResidentTabInfo(tab=object(), slot_id="slot-1", project_id="project-1") + self.service.browser = types.SimpleNamespace(stopped=False) + self.service._initialized = True + self.service._mark_runtime_restart() + self.service._probe_browser_runtime = AsyncMock(return_value=True) + self.service._ensure_resident_tab = AsyncMock(return_value=("slot-1", resident_info)) + self.service._restart_browser_for_project_unlocked = AsyncMock(return_value=True) + + result = await self.service._restart_browser_for_project("project-1") + + self.assertTrue(result) + self.service._restart_browser_for_project_unlocked.assert_not_awaited() + self.service._ensure_resident_tab.assert_awaited_once() + + +if __name__ == "__main__": + unittest.main()