diff --git a/src/services/browser_captcha.py b/src/services/browser_captcha.py index 63a5b78..46e0699 100644 --- a/src/services/browser_captcha.py +++ b/src/services/browser_captcha.py @@ -5,6 +5,7 @@ import os import sys import subprocess +import signal # 修复 Windows 上 playwright 的 asyncio 兼容性问题 os.environ.setdefault("PLAYWRIGHT_BROWSERS_PATH", "0") @@ -13,7 +14,6 @@ import time import re import random import uuid -from pathlib import Path from typing import Optional, Dict, Any, List, Union from datetime import datetime from urllib.parse import urlparse, unquote, parse_qs @@ -136,13 +136,25 @@ def _ensure_playwright_installed() -> bool: def _ensure_browser_installed() -> bool: """确保 chromium 浏览器已安装""" try: - from playwright.sync_api import sync_playwright - with sync_playwright() as p: - # 尝试获取浏览器路径,如果失败说明未安装 - browser_path = p.chromium.executable_path - if browser_path and os.path.exists(browser_path): - debug_logger.log_info(f"[BrowserCaptcha] chromium 浏览器已安装: {browser_path}") - return True + detect_script = ( + "from playwright.sync_api import sync_playwright\n" + "with sync_playwright() as p:\n" + " print(p.chromium.executable_path or '')\n" + ) + env = os.environ.copy() + env.setdefault("PLAYWRIGHT_BROWSERS_PATH", os.environ.get("PLAYWRIGHT_BROWSERS_PATH", "0") or "0") + result = subprocess.run( + [sys.executable, "-c", detect_script], + capture_output=True, + text=True, + timeout=60, + env=env, + ) + browser_path = (result.stdout or "").strip().splitlines() + browser_path = browser_path[-1].strip() if browser_path else "" + if result.returncode == 0 and browser_path and os.path.exists(browser_path): + debug_logger.log_info(f"[BrowserCaptcha] chromium 浏览器已安装: {browser_path}") + return True except Exception as e: debug_logger.log_info(f"[BrowserCaptcha] 检测浏览器时出错: {e}") @@ -356,31 +368,193 @@ class TokenBrowser: self.token_id = token_id self.user_data_dir = user_data_dir self.db = db - self._semaphore = asyncio.Semaphore(1) # 同时只能有一个任务 + self._semaphore = asyncio.Semaphore(1) # ????????? self._solve_count = 0 self._error_count = 0 self._last_fingerprint: Optional[Dict[str, Any]] = None self._browser_proxy_active = False - # 打码成功后延迟关闭浏览器:等待上游图片/视频请求完成通知 - # request_ref -> {"event": asyncio.Event, "task": asyncio.Task} - # 使用请求级句柄避免高并发下“按顺序 pop”导致的错配关闭。 + # ???????? request_ref ????????????? self._pending_release_entries: Dict[str, Dict[str, Any]] = {} self._pending_release_lock = asyncio.Lock() - - async def _create_browser(self, token_proxy_url: Optional[str] = None) -> tuple: - """创建新浏览器实例(新 UA),返回 (playwright, browser, context)""" - import random - - random_ua = random.choice(self.UA_LIST) + # browser ????????????????? profile??????????? + self._shared_browser_lock = asyncio.Lock() + self._shared_playwright = None + self._shared_browser = None + self._shared_context = None + self._shared_keepalive_page = None + self._shared_browser_pid: Optional[int] = None + self._pid_dir = os.path.join(os.getcwd(), "tmp", "browser_pids") + self._pid_file = os.path.join(self._pid_dir, f"slot_{self.token_id}.pid") + os.makedirs(self._pid_dir, exist_ok=True) + self._shared_proxy_url: Optional[str] = None + self._shared_launch_count = 0 + self._shared_reuse_count = 0 + self._consecutive_browser_failures = 0 + self._refresh_browser_profile() + + def _refresh_browser_profile(self): + """?????????????????????""" base_w, base_h = random.choice(self.RESOLUTIONS) - width, height = base_w, base_h - random.randint(0, 80) - viewport = {"width": width, "height": height} - launch_in_background = bool(getattr(config, "browser_launch_background", True)) - - playwright = await async_playwright().start() - Path(self.user_data_dir).mkdir(parents=True, exist_ok=True) - - # 代理配置 + self._profile_user_agent = random.choice(self.UA_LIST) + self._profile_viewport = { + "width": base_w, + "height": base_h - random.randint(0, 80), + } + + def _get_slot_marker(self) -> str: + return f"--flow2api-browser-slot={self.token_id}" + + def _read_pid_file(self) -> Optional[int]: + try: + if not os.path.exists(self._pid_file): + return None + with open(self._pid_file, 'r', encoding='utf-8') as handle: + raw = (handle.read() or '').strip() + return int(raw or '0') or None + except Exception: + return None + + def _write_pid_file(self, pid: Optional[int]): + self._shared_browser_pid = pid + try: + if pid: + with open(self._pid_file, 'w', encoding='utf-8') as handle: + handle.write(str(pid)) + elif os.path.exists(self._pid_file): + os.remove(self._pid_file) + except Exception as e: + debug_logger.log_warning(f"[BrowserCaptcha] Token-{self.token_id} ?? PID ????: {e}") + + def _is_pid_running(self, pid: Optional[int]) -> bool: + if not pid: + return False + try: + if sys.platform.startswith('win'): + result = subprocess.run( + ['tasklist', '/FI', f'PID eq {pid}'], + capture_output=True, + text=True, + timeout=10, + ) + return str(pid) in (result.stdout or '') + os.kill(pid, 0) + return True + except Exception: + return False + + def _pid_matches_slot(self, pid: Optional[int]) -> bool: + if not pid: + return False + marker = self._get_slot_marker() + try: + if sys.platform.startswith('win'): + result = subprocess.run( + [ + 'powershell', + '-NoProfile', + '-Command', + f'(Get-CimInstance Win32_Process -Filter "ProcessId = {pid}").CommandLine' + ], + capture_output=True, + text=True, + timeout=15, + ) + command_line = (result.stdout or '').strip() + else: + cmdline_path = f'/proc/{pid}/cmdline' + if not os.path.exists(cmdline_path): + return False + with open(cmdline_path, 'rb') as handle: + command_line = handle.read().decode('utf-8', errors='ignore').replace('\x00', ' ') + return marker in command_line + except Exception: + return False + + async def _wait_pid_exit(self, pid: Optional[int], timeout_seconds: float = 5.0) -> bool: + if not pid: + return True + deadline = time.time() + timeout_seconds + while time.time() < deadline: + if not self._is_pid_running(pid): + return True + await asyncio.sleep(0.2) + return not self._is_pid_running(pid) + + def _kill_pid(self, pid: Optional[int], reason: str): + if not pid: + return + try: + debug_logger.log_warning( + f"[BrowserCaptcha] Token-{self.token_id} ??????????????? PID={pid}, reason={reason}" + ) + if sys.platform.startswith('win'): + subprocess.run( + ['taskkill', '/PID', str(pid), '/T', '/F'], + capture_output=True, + text=True, + timeout=15, + ) + else: + os.kill(pid, signal.SIGKILL) + except Exception as e: + debug_logger.log_warning(f"[BrowserCaptcha] Token-{self.token_id} ???? PID={pid} ??: {e}") + + async def _cleanup_stale_slot_process(self): + stale_pid = self._read_pid_file() + if not stale_pid: + return + if not self._is_pid_running(stale_pid): + self._write_pid_file(None) + return + if not self._pid_matches_slot(stale_pid): + debug_logger.log_warning( + f"[BrowserCaptcha] Token-{self.token_id} ??? PID ?????????????????? PID={stale_pid}" + ) + self._write_pid_file(None) + return + self._kill_pid(stale_pid, reason='stale_slot_process') + await self._wait_pid_exit(stale_pid, timeout_seconds=3) + self._write_pid_file(None) + + def _extract_browser_pid(self, browser) -> Optional[int]: + candidates = [ + lambda obj: obj._impl_obj._connection._transport._proc.pid, + lambda obj: obj._impl_obj._connection._transport._proc.pid if obj and obj._impl_obj else None, + ] + for getter in candidates: + try: + pid = getter(browser) + if isinstance(pid, int) and pid > 0: + return pid + except Exception: + continue + return None + + async def _ensure_shared_keepalive_page(self): + """??????????????????????????????????""" + keepalive_page = self._shared_keepalive_page + try: + if keepalive_page and not keepalive_page.is_closed(): + return keepalive_page + except Exception: + keepalive_page = None + + if not self._shared_context: + return None + + keepalive_page = await self._shared_context.new_page() + try: + await keepalive_page.goto("about:blank", wait_until="load", timeout=5000) + except Exception: + pass + self._shared_keepalive_page = keepalive_page + debug_logger.log_info( + f"[BrowserCaptcha] Token-{self.token_id} ????????" + ) + return keepalive_page + + async def _resolve_proxy_runtime_config(self, token_proxy_url: Optional[str] = None) -> tuple: + """?????????????????""" proxy_option = None raw_proxy_url = None proxy_source = "none" @@ -405,21 +579,37 @@ class TokenBrowser: raw_proxy_url = normalized_proxy_url self._browser_proxy_active = True debug_logger.log_info( - f"[BrowserCaptcha] Token-{self.token_id} 使用{proxy_source}代理: {proxy_option['server']}" + f"[BrowserCaptcha] Token-{self.token_id} ??{proxy_source}??: {proxy_option['server']}" ) else: debug_logger.log_warning( - f"[BrowserCaptcha] Token-{self.token_id} {proxy_source}代理格式无效,已忽略" + f"[BrowserCaptcha] Token-{self.token_id} {proxy_source}??????????" ) except Exception as e: - debug_logger.log_warning(f"[BrowserCaptcha] Token-{self.token_id} 读取代理配置失败: {e}") - - # 先记录创建时的指纹,后续会在页面中补齐 sec-ch-* 等信息 + debug_logger.log_warning(f"[BrowserCaptcha] Token-{self.token_id} ????????: {e}") + + return proxy_option, raw_proxy_url, proxy_source + + async def _create_browser(self, token_proxy_url: Optional[str] = None, manage_slot_pid: bool = True) -> tuple: + """?????????????????? PID?????????????????""" + random_ua = self._profile_user_agent + width = self._profile_viewport["width"] + height = self._profile_viewport["height"] + viewport = {"width": width, "height": height} + launch_in_background = bool(getattr(config, "browser_launch_background", True)) + + if manage_slot_pid: + await self._cleanup_stale_slot_process() + playwright = await async_playwright().start() + browser_executable_path = os.environ.get("BROWSER_EXECUTABLE_PATH", "").strip() or None + proxy_option, raw_proxy_url, _ = await self._resolve_proxy_runtime_config(token_proxy_url=token_proxy_url) + + # ??????????????????? sec-ch-* ???? self._last_fingerprint = { "user_agent": random_ua, "proxy_url": raw_proxy_url if raw_proxy_url else None, } - + try: browser_args = [ '--disable-blink-features=AutomationControlled', @@ -441,32 +631,128 @@ class TokenBrowser: '--disable-background-timer-throttling', '--disable-renderer-backgrounding', '--disable-backgrounding-occluded-windows', + f'--flow2api-browser-slot={self.token_id}', ]) if sys.platform.startswith("win"): browser_args.append('--window-position=-32000,-32000') debug_logger.log_info( - f"[BrowserCaptcha] Token-{self.token_id} 有头浏览器将以后台模式启动" + f"[BrowserCaptcha] Token-{self.token_id} ?????????????" + ) + + if browser_executable_path: + debug_logger.log_info( + f"[BrowserCaptcha] Token-{self.token_id} ????????????: {browser_executable_path}" ) browser = await playwright.chromium.launch( headless=False, + executable_path=browser_executable_path, proxy=proxy_option, - args=browser_args + args=browser_args, ) context = await browser.new_context( user_agent=random_ua, viewport=viewport, ) + browser_pid = self._extract_browser_pid(browser) + if manage_slot_pid: + self._write_pid_file(browser_pid) + debug_logger.log_info( + f"[BrowserCaptcha] Token-{self.token_id} ????????? (proxy={'yes' if raw_proxy_url else 'no'})" + ) return playwright, browser, context except Exception as e: - debug_logger.log_error(f"[BrowserCaptcha] Token-{self.token_id} 启动浏览器失败: {type(e).__name__}: {str(e)[:200]}") - # 确保清理已创建的对象 + debug_logger.log_error(f"[BrowserCaptcha] Token-{self.token_id} ???????: {type(e).__name__}: {str(e)[:200]}") try: if playwright: await playwright.stop() - except: pass + except Exception: + pass + if manage_slot_pid: + self._write_pid_file(None) raise + async def _recycle_browser_locked(self, reason: str = "unknown", rotate_profile: bool = True): + """???????????????????""" + playwright = self._shared_playwright + browser = self._shared_browser + context = self._shared_context + keepalive_page = self._shared_keepalive_page + browser_pid = self._shared_browser_pid or self._read_pid_file() + had_browser = bool(playwright or browser or context or keepalive_page or browser_pid) + + self._shared_playwright = None + self._shared_browser = None + self._shared_context = None + self._shared_keepalive_page = None + self._shared_browser_pid = None + self._shared_proxy_url = None + self._consecutive_browser_failures = 0 + self._shared_reuse_count = 0 + + if rotate_profile: + self._refresh_browser_profile() + + if had_browser: + debug_logger.log_info( + f"[BrowserCaptcha] Token-{self.token_id} ????????reason={reason}" + ) + await self._close_browser(playwright, browser, context, browser_pid=browser_pid) + + async def recycle_browser(self, reason: str = "unknown", rotate_profile: bool = True): + """????????????""" + async with self._shared_browser_lock: + await self._recycle_browser_locked(reason=reason, rotate_profile=rotate_profile) + + async def _get_or_create_shared_browser(self, token_proxy_url: Optional[str] = None) -> tuple: + """????????????????????????????""" + _, expected_proxy_url, _ = await self._resolve_proxy_runtime_config(token_proxy_url=token_proxy_url) + + async with self._shared_browser_lock: + has_shared_browser = bool(self._shared_playwright and self._shared_browser and self._shared_context) + + if has_shared_browser: + is_connected = True + try: + checker = getattr(self._shared_browser, "is_connected", None) + if callable(checker): + is_connected = bool(checker()) + except Exception: + is_connected = False + + if not is_connected: + await self._recycle_browser_locked(reason="browser_disconnected", rotate_profile=False) + has_shared_browser = False + + if has_shared_browser and self._shared_proxy_url != expected_proxy_url: + # ???????????????????????????????????? + await self._recycle_browser_locked(reason="proxy_changed", rotate_profile=False) + has_shared_browser = False + + if has_shared_browser: + try: + await self._ensure_shared_keepalive_page() + except Exception: + await self._recycle_browser_locked(reason="keepalive_page_broken", rotate_profile=False) + has_shared_browser = False + + if has_shared_browser: + self._shared_reuse_count += 1 + debug_logger.log_info( + f"[BrowserCaptcha] Token-{self.token_id} ??????? (reuse={self._shared_reuse_count})" + ) + return self._shared_playwright, self._shared_browser, self._shared_context + + playwright, browser, context = await self._create_browser(token_proxy_url=token_proxy_url) + self._shared_playwright = playwright + self._shared_browser = browser + self._shared_context = context + await self._ensure_shared_keepalive_page() + self._shared_proxy_url = (self._last_fingerprint or {}).get("proxy_url") + self._shared_launch_count += 1 + self._shared_reuse_count = 0 + return playwright, browser, context + async def _capture_page_fingerprint(self, page): """从浏览器页面提取 UA 与客户端提示头,确保与打码浏览器一致。""" try: @@ -638,20 +924,50 @@ class TokenBrowser: }, } - async def _close_browser(self, playwright, browser, context): - """关闭浏览器实例""" + async def _close_browser( + self, + playwright, + browser, + context, + browser_pid: Optional[int] = None, + clear_slot_pid: bool = True, + ): + """??????????????????????? PID ?????""" + is_shared_browser = any([ + context is not None and context is self._shared_context, + browser is not None and browser is self._shared_browser, + playwright is not None and playwright is self._shared_playwright, + ]) + effective_pid = browser_pid or self._extract_browser_pid(browser) + if clear_slot_pid and not effective_pid: + effective_pid = self._shared_browser_pid or self._read_pid_file() + if is_shared_browser: + self._shared_playwright = None + self._shared_browser = None + self._shared_context = None + self._shared_keepalive_page = None + self._shared_browser_pid = None + self._shared_proxy_url = None try: if context: - await context.close() - except: pass + await asyncio.wait_for(context.close(), timeout=10) + except Exception: + pass try: if browser: - await browser.close() - except: pass + await asyncio.wait_for(browser.close(), timeout=10) + except Exception: + pass try: if playwright: - await playwright.stop() - except: pass + await asyncio.wait_for(playwright.stop(), timeout=10) + except Exception: + pass + if effective_pid and not await self._wait_pid_exit(effective_pid, timeout_seconds=4): + self._kill_pid(effective_pid, reason='close_timeout_or_orphan') + await self._wait_pid_exit(effective_pid, timeout_seconds=2) + if clear_slot_pid: + self._write_pid_file(None) async def _wait_and_close_after_request( self, @@ -748,7 +1064,7 @@ class TokenBrowser: ) async def force_close_pending_browser(self, request_ref: Optional[str] = None, close_all: bool = False): - """强制关闭待释放浏览器(服务关闭时调用)。""" + """????????????????????""" async with self._pending_release_lock: entries: List[Dict[str, Any]] = [] if close_all: @@ -758,7 +1074,6 @@ class TokenBrowser: entry = self._pending_release_entries.pop(request_ref) entries = [entry] elif self._pending_release_entries: - # 兼容旧调用方(无 request_ref)时,仅关闭最早的一项,避免误伤其它并发请求。 first_ref = next(iter(self._pending_release_entries.keys())) entry = self._pending_release_entries.pop(first_ref) entries = [entry] @@ -778,7 +1093,10 @@ class TokenBrowser: await asyncio.wait_for(release_task, timeout=5) except Exception: pass - + + if close_all: + await self.recycle_browser(reason="force_close_all", rotate_profile=False) + async def _execute_captcha(self, context, project_id: str, website_key: str, action: str) -> Optional[str]: """在给定 context 中执行打码逻辑""" page = None @@ -1122,52 +1440,52 @@ class TokenBrowser: action: str = "IMAGE_GENERATION", token_proxy_url: Optional[str] = None ) -> tuple[Optional[str], Optional[str]]: - """获取 Token:启动新浏览器 -> 打码 -> 关闭浏览器""" + """?? Token??????????????? fatal ??????""" async with self._semaphore: - MAX_RETRIES = 3 - - for attempt in range(MAX_RETRIES): - playwright = None - browser = None - context = None + max_retries = 3 + + for attempt in range(max_retries): try: start_ts = time.time() - - # 每次都启动新浏览器(新 UA) - playwright, browser, context = await self._create_browser(token_proxy_url=token_proxy_url) - - # 执行打码 + _, _, context = await self._get_or_create_shared_browser(token_proxy_url=token_proxy_url) + token = await self._execute_captcha(context, project_id, website_key, action) - if token: self._solve_count += 1 - debug_logger.log_info(f"[BrowserCaptcha] Token-{self.token_id} 获取成功 ({(time.time()-start_ts)*1000:.0f}ms)") - # 不立即关闭浏览器:等待图片/视频请求结束后再关闭 - request_ref = await self._defer_browser_close_until_request_done( - playwright=playwright, - browser=browser, - context=context, - action=action, + self._consecutive_browser_failures = 0 + debug_logger.log_info( + f"[BrowserCaptcha] Token-{self.token_id} ???? ({(time.time()-start_ts)*1000:.0f}ms, launches={self._shared_launch_count}, reuse={self._shared_reuse_count})" ) - playwright = None - browser = None - context = None - return token, request_ref - + return token, None + self._error_count += 1 - debug_logger.log_warning(f"[BrowserCaptcha] Token-{self.token_id} 尝试 {attempt+1}/{MAX_RETRIES} 失败") - + self._consecutive_browser_failures += 1 + debug_logger.log_warning( + f"[BrowserCaptcha] Token-{self.token_id} ??????? {attempt + 1}/{max_retries} ??" + ) + if self._consecutive_browser_failures >= 2: + await self.recycle_browser(reason=f"captcha_failed_{attempt + 1}", rotate_profile=False) except Exception as e: self._error_count += 1 - debug_logger.log_error(f"[BrowserCaptcha] Token-{self.token_id} 浏览器错误: {type(e).__name__}: {str(e)[:200]}") - finally: - # 无论成功失败都关闭浏览器 - await self._close_browser(playwright, browser, context) - - # 重试前等待 - if attempt < MAX_RETRIES - 1: + self._consecutive_browser_failures += 1 + error_message = f"{type(e).__name__}: {str(e)}" + debug_logger.log_error( + f"[BrowserCaptcha] Token-{self.token_id} ???????: {error_message[:200]}" + ) + error_lower = error_message.lower() + if any(keyword in error_lower for keyword in [ + "context or browser has been closed", + "target closed", + "browser has been closed", + "connection closed", + "crash", + "closed", + ]): + await self.recycle_browser(reason="browser_runtime_error", rotate_profile=False) + + if attempt < max_retries - 1: await asyncio.sleep(1) - + return None, None async def get_custom_token( @@ -1187,7 +1505,7 @@ class TokenBrowser: context = None try: start_ts = time.time() - playwright, browser, context = await self._create_browser() + playwright, browser, context = await self._create_browser(manage_slot_pid=False) token = await self._execute_custom_captcha( context=context, website_url=website_url, @@ -1213,7 +1531,13 @@ class TokenBrowser: f"[BrowserCaptcha] Token-{self.token_id} 自定义浏览器错误: {type(e).__name__}: {str(e)[:200]}" ) finally: - await self._close_browser(playwright, browser, context) + await self._close_browser( + playwright, + browser, + context, + browser_pid=self._extract_browser_pid(browser), + clear_slot_pid=False, + ) if attempt < max_retries - 1: await asyncio.sleep(1) @@ -1238,7 +1562,7 @@ class TokenBrowser: context = None try: started_at = time.time() - playwright, browser, context = await self._create_browser() + playwright, browser, context = await self._create_browser(manage_slot_pid=False) payload = await self._execute_custom_captcha( context=context, website_url=website_url, @@ -1266,7 +1590,13 @@ class TokenBrowser: f"[BrowserCaptcha] Token-{self.token_id} 页面内分数校验异常: {type(e).__name__}: {str(e)[:200]}" ) finally: - await self._close_browser(playwright, browser, context) + await self._close_browser( + playwright, + browser, + context, + browser_pid=self._extract_browser_pid(browser), + clear_slot_pid=False, + ) if attempt < max_retries - 1: await asyncio.sleep(1) @@ -1359,12 +1689,20 @@ class BrowserCaptchaService: await self._load_browser_count() # 如果数量减少,移除多余的浏览器实例 + browsers_to_close: List[TokenBrowser] = [] if self._browser_count < old_count: async with self._browsers_lock: for browser_id in list(self._browsers.keys()): if browser_id >= self._browser_count: - self._browsers.pop(browser_id) + browsers_to_close.append(self._browsers.pop(browser_id)) debug_logger.log_info(f"[BrowserCaptcha] 移除多余浏览器实例 {browser_id}") + + for browser in browsers_to_close: + try: + await browser.force_close_pending_browser(close_all=True) + await browser.recycle_browser(reason="browser_slot_removed", rotate_profile=False) + except Exception as e: + debug_logger.log_warning(f"[BrowserCaptcha] 缩容关闭浏览器实例失败: {e}") def _log_stats(self): total = self._stats["req_total"] @@ -1570,36 +1908,37 @@ class BrowserCaptchaService: return browser.get_last_fingerprint() async def report_error(self, browser_ref: Optional[Union[int, str]] = None, error_reason: Optional[str] = None): - """上层举报当前请求失败,必要时提前回收待释放浏览器。 - - Args: - browser_ref: 浏览器请求句柄(browser_id[:request_ref]) - """ - browser_id, request_ref = self._parse_browser_ref(browser_ref) + """????????????????? reCAPTCHA evaluation failed ????????""" + browser_id, _ = self._parse_browser_ref(browser_ref) async with self._browsers_lock: browser = self._browsers.get(browser_id) if browser_id is not None else None error_lower = (error_reason or "").lower() - if "403" in error_lower or "recaptcha" in error_lower: + has_recaptcha = "recaptcha" in error_lower + should_recycle = has_recaptcha and ( + "evaluation failed" in error_lower + or "????" in error_lower + or "failed" in error_lower + ) + if should_recycle: self._stats["api_403"] += 1 if browser_id is not None: debug_logger.log_info( - f"[BrowserCaptcha] 浏览器 {browser_id} 的 token 验证失败,reason={error_reason or 'unknown'}" + f"[BrowserCaptcha] ??? {browser_id} ????????reason={error_reason or 'unknown'}, recycle={should_recycle}" ) - if browser: + if browser and should_recycle: try: - if request_ref: - await browser.force_close_pending_browser(request_ref=request_ref) - else: - # 未携带 request_ref 时只回收一项,避免高并发下误关其它请求链路。 - await browser.force_close_pending_browser() + await browser.recycle_browser( + reason=error_reason or "recaptcha_evaluation_failed", + rotate_profile=True, + ) except Exception as e: - debug_logger.log_warning(f"[BrowserCaptcha] 浏览器 {browser_id} 失败后提前关闭异常: {e}") + debug_logger.log_warning(f"[BrowserCaptcha] ??? {browser_id} ????: {e}") async def report_request_finished(self, browser_ref: Optional[Union[int, str]] = None): - """上层通知:图片/视频请求已完成,可关闭对应打码浏览器。""" - browser_id, request_ref = self._parse_browser_ref(browser_ref) + """上层通知本次请求已完成;browser 模式仅保留常驻浏览器,不在成功后主动关闭。""" + browser_id, _ = self._parse_browser_ref(browser_ref) if browser_id is None: return @@ -1607,7 +1946,15 @@ class BrowserCaptchaService: browser = self._browsers.get(browser_id) if browser: - await browser.notify_generation_request_finished(request_ref=request_ref) + keepalive_alive = False + keepalive_page = getattr(browser, '_shared_keepalive_page', None) + try: + keepalive_alive = bool(keepalive_page and not keepalive_page.is_closed()) + except Exception: + keepalive_alive = False + debug_logger.log_info( + f"[BrowserCaptcha] ??? {browser_id} ???????????????????? keepalive_alive={keepalive_alive}" + ) async def remove_browser(self, browser_id: int): async with self._browsers_lock: @@ -1622,6 +1969,7 @@ class BrowserCaptchaService: for browser in browsers: try: await browser.force_close_pending_browser(close_all=True) + await browser.recycle_browser(reason="service_shutdown", rotate_profile=False) except Exception: pass diff --git a/src/services/flow_client.py b/src/services/flow_client.py index 6182cca..0eb7294 100644 --- a/src/services/flow_client.py +++ b/src/services/flow_client.py @@ -1795,16 +1795,17 @@ class FlowClient: """统一处理生成链路的重试判定与打码自愈通知。""" error_str = str(error) retry_reason = self._get_retry_reason(error_str) + notify_reason = retry_reason or error_str[:120] or type(error).__name__ + await self._notify_browser_captcha_error( + browser_id=browser_id, + project_id=project_id, + error_reason=notify_reason, + error_message=error_str, + ) if not retry_reason: return False is_terminal_attempt = retry_attempt >= max_retries - 1 - await self._notify_browser_captcha_error( - browser_id=browser_id, - project_id=project_id, - error_reason=retry_reason, - error_message=error_str, - ) if is_terminal_attempt: debug_logger.log_warning( @@ -1879,7 +1880,10 @@ class FlowClient: try: from .browser_captcha import BrowserCaptchaService service = await BrowserCaptchaService.get_instance(self.db) - await service.report_error(browser_id, error_reason=error_reason) + await service.report_error( + browser_id, + error_reason=error_reason or error_message or "upstream_error" + ) except Exception: pass elif config.captcha_method == "personal" and project_id: