feat: keep headed browser sessions alive

This commit is contained in:
genz27
2026-03-07 23:00:33 +08:00
parent a756954e33
commit c2ec39e2ed
2 changed files with 465 additions and 113 deletions

View File

@@ -5,6 +5,7 @@
import os
import sys
import subprocess
import signal
# 修复 Windows 上 playwright 的 asyncio 兼容性问题
os.environ.setdefault("PLAYWRIGHT_BROWSERS_PATH", "0")
@@ -13,7 +14,6 @@ import time
import re
import random
import uuid
from pathlib import Path
from typing import Optional, Dict, Any, List, Union
from datetime import datetime
from urllib.parse import urlparse, unquote, parse_qs
@@ -136,13 +136,25 @@ def _ensure_playwright_installed() -> bool:
def _ensure_browser_installed() -> bool:
"""确保 chromium 浏览器已安装"""
try:
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
# 尝试获取浏览器路径,如果失败说明未安装
browser_path = p.chromium.executable_path
if browser_path and os.path.exists(browser_path):
debug_logger.log_info(f"[BrowserCaptcha] chromium 浏览器已安装: {browser_path}")
return True
detect_script = (
"from playwright.sync_api import sync_playwright\n"
"with sync_playwright() as p:\n"
" print(p.chromium.executable_path or '')\n"
)
env = os.environ.copy()
env.setdefault("PLAYWRIGHT_BROWSERS_PATH", os.environ.get("PLAYWRIGHT_BROWSERS_PATH", "0") or "0")
result = subprocess.run(
[sys.executable, "-c", detect_script],
capture_output=True,
text=True,
timeout=60,
env=env,
)
browser_path = (result.stdout or "").strip().splitlines()
browser_path = browser_path[-1].strip() if browser_path else ""
if result.returncode == 0 and browser_path and os.path.exists(browser_path):
debug_logger.log_info(f"[BrowserCaptcha] chromium 浏览器已安装: {browser_path}")
return True
except Exception as e:
debug_logger.log_info(f"[BrowserCaptcha] 检测浏览器时出错: {e}")
@@ -356,31 +368,193 @@ class TokenBrowser:
self.token_id = token_id
self.user_data_dir = user_data_dir
self.db = db
self._semaphore = asyncio.Semaphore(1) # 同时只能有一个任务
self._semaphore = asyncio.Semaphore(1) # ?????????
self._solve_count = 0
self._error_count = 0
self._last_fingerprint: Optional[Dict[str, Any]] = None
self._browser_proxy_active = False
# 打码成功后延迟关闭浏览器:等待上游图片/视频请求完成通知
# request_ref -> {"event": asyncio.Event, "task": asyncio.Task}
# 使用请求级句柄避免高并发下“按顺序 pop”导致的错配关闭。
# ???????? request_ref ?????????????
self._pending_release_entries: Dict[str, Dict[str, Any]] = {}
self._pending_release_lock = asyncio.Lock()
async def _create_browser(self, token_proxy_url: Optional[str] = None) -> tuple:
"""创建新浏览器实例(新 UA返回 (playwright, browser, context)"""
import random
random_ua = random.choice(self.UA_LIST)
# browser ????????????????? profile???????????
self._shared_browser_lock = asyncio.Lock()
self._shared_playwright = None
self._shared_browser = None
self._shared_context = None
self._shared_keepalive_page = None
self._shared_browser_pid: Optional[int] = None
self._pid_dir = os.path.join(os.getcwd(), "tmp", "browser_pids")
self._pid_file = os.path.join(self._pid_dir, f"slot_{self.token_id}.pid")
os.makedirs(self._pid_dir, exist_ok=True)
self._shared_proxy_url: Optional[str] = None
self._shared_launch_count = 0
self._shared_reuse_count = 0
self._consecutive_browser_failures = 0
self._refresh_browser_profile()
def _refresh_browser_profile(self):
"""?????????????????????"""
base_w, base_h = random.choice(self.RESOLUTIONS)
width, height = base_w, base_h - random.randint(0, 80)
viewport = {"width": width, "height": height}
launch_in_background = bool(getattr(config, "browser_launch_background", True))
playwright = await async_playwright().start()
Path(self.user_data_dir).mkdir(parents=True, exist_ok=True)
# 代理配置
self._profile_user_agent = random.choice(self.UA_LIST)
self._profile_viewport = {
"width": base_w,
"height": base_h - random.randint(0, 80),
}
def _get_slot_marker(self) -> str:
return f"--flow2api-browser-slot={self.token_id}"
def _read_pid_file(self) -> Optional[int]:
try:
if not os.path.exists(self._pid_file):
return None
with open(self._pid_file, 'r', encoding='utf-8') as handle:
raw = (handle.read() or '').strip()
return int(raw or '0') or None
except Exception:
return None
def _write_pid_file(self, pid: Optional[int]):
self._shared_browser_pid = pid
try:
if pid:
with open(self._pid_file, 'w', encoding='utf-8') as handle:
handle.write(str(pid))
elif os.path.exists(self._pid_file):
os.remove(self._pid_file)
except Exception as e:
debug_logger.log_warning(f"[BrowserCaptcha] Token-{self.token_id} ?? PID ????: {e}")
def _is_pid_running(self, pid: Optional[int]) -> bool:
if not pid:
return False
try:
if sys.platform.startswith('win'):
result = subprocess.run(
['tasklist', '/FI', f'PID eq {pid}'],
capture_output=True,
text=True,
timeout=10,
)
return str(pid) in (result.stdout or '')
os.kill(pid, 0)
return True
except Exception:
return False
def _pid_matches_slot(self, pid: Optional[int]) -> bool:
if not pid:
return False
marker = self._get_slot_marker()
try:
if sys.platform.startswith('win'):
result = subprocess.run(
[
'powershell',
'-NoProfile',
'-Command',
f'(Get-CimInstance Win32_Process -Filter "ProcessId = {pid}").CommandLine'
],
capture_output=True,
text=True,
timeout=15,
)
command_line = (result.stdout or '').strip()
else:
cmdline_path = f'/proc/{pid}/cmdline'
if not os.path.exists(cmdline_path):
return False
with open(cmdline_path, 'rb') as handle:
command_line = handle.read().decode('utf-8', errors='ignore').replace('\x00', ' ')
return marker in command_line
except Exception:
return False
async def _wait_pid_exit(self, pid: Optional[int], timeout_seconds: float = 5.0) -> bool:
if not pid:
return True
deadline = time.time() + timeout_seconds
while time.time() < deadline:
if not self._is_pid_running(pid):
return True
await asyncio.sleep(0.2)
return not self._is_pid_running(pid)
def _kill_pid(self, pid: Optional[int], reason: str):
if not pid:
return
try:
debug_logger.log_warning(
f"[BrowserCaptcha] Token-{self.token_id} ??????????????? PID={pid}, reason={reason}"
)
if sys.platform.startswith('win'):
subprocess.run(
['taskkill', '/PID', str(pid), '/T', '/F'],
capture_output=True,
text=True,
timeout=15,
)
else:
os.kill(pid, signal.SIGKILL)
except Exception as e:
debug_logger.log_warning(f"[BrowserCaptcha] Token-{self.token_id} ???? PID={pid} ??: {e}")
async def _cleanup_stale_slot_process(self):
stale_pid = self._read_pid_file()
if not stale_pid:
return
if not self._is_pid_running(stale_pid):
self._write_pid_file(None)
return
if not self._pid_matches_slot(stale_pid):
debug_logger.log_warning(
f"[BrowserCaptcha] Token-{self.token_id} ??? PID ?????????????????? PID={stale_pid}"
)
self._write_pid_file(None)
return
self._kill_pid(stale_pid, reason='stale_slot_process')
await self._wait_pid_exit(stale_pid, timeout_seconds=3)
self._write_pid_file(None)
def _extract_browser_pid(self, browser) -> Optional[int]:
candidates = [
lambda obj: obj._impl_obj._connection._transport._proc.pid,
lambda obj: obj._impl_obj._connection._transport._proc.pid if obj and obj._impl_obj else None,
]
for getter in candidates:
try:
pid = getter(browser)
if isinstance(pid, int) and pid > 0:
return pid
except Exception:
continue
return None
async def _ensure_shared_keepalive_page(self):
"""??????????????????????????????????"""
keepalive_page = self._shared_keepalive_page
try:
if keepalive_page and not keepalive_page.is_closed():
return keepalive_page
except Exception:
keepalive_page = None
if not self._shared_context:
return None
keepalive_page = await self._shared_context.new_page()
try:
await keepalive_page.goto("about:blank", wait_until="load", timeout=5000)
except Exception:
pass
self._shared_keepalive_page = keepalive_page
debug_logger.log_info(
f"[BrowserCaptcha] Token-{self.token_id} ????????"
)
return keepalive_page
async def _resolve_proxy_runtime_config(self, token_proxy_url: Optional[str] = None) -> tuple:
"""?????????????????"""
proxy_option = None
raw_proxy_url = None
proxy_source = "none"
@@ -405,21 +579,37 @@ class TokenBrowser:
raw_proxy_url = normalized_proxy_url
self._browser_proxy_active = True
debug_logger.log_info(
f"[BrowserCaptcha] Token-{self.token_id} 使用{proxy_source}代理: {proxy_option['server']}"
f"[BrowserCaptcha] Token-{self.token_id} ??{proxy_source}??: {proxy_option['server']}"
)
else:
debug_logger.log_warning(
f"[BrowserCaptcha] Token-{self.token_id} {proxy_source}代理格式无效,已忽略"
f"[BrowserCaptcha] Token-{self.token_id} {proxy_source}??????????"
)
except Exception as e:
debug_logger.log_warning(f"[BrowserCaptcha] Token-{self.token_id} 读取代理配置失败: {e}")
# 先记录创建时的指纹,后续会在页面中补齐 sec-ch-* 等信息
debug_logger.log_warning(f"[BrowserCaptcha] Token-{self.token_id} ????????: {e}")
return proxy_option, raw_proxy_url, proxy_source
async def _create_browser(self, token_proxy_url: Optional[str] = None, manage_slot_pid: bool = True) -> tuple:
"""?????????????????? PID?????????????????"""
random_ua = self._profile_user_agent
width = self._profile_viewport["width"]
height = self._profile_viewport["height"]
viewport = {"width": width, "height": height}
launch_in_background = bool(getattr(config, "browser_launch_background", True))
if manage_slot_pid:
await self._cleanup_stale_slot_process()
playwright = await async_playwright().start()
browser_executable_path = os.environ.get("BROWSER_EXECUTABLE_PATH", "").strip() or None
proxy_option, raw_proxy_url, _ = await self._resolve_proxy_runtime_config(token_proxy_url=token_proxy_url)
# ??????????????????? sec-ch-* ????
self._last_fingerprint = {
"user_agent": random_ua,
"proxy_url": raw_proxy_url if raw_proxy_url else None,
}
try:
browser_args = [
'--disable-blink-features=AutomationControlled',
@@ -441,32 +631,128 @@ class TokenBrowser:
'--disable-background-timer-throttling',
'--disable-renderer-backgrounding',
'--disable-backgrounding-occluded-windows',
f'--flow2api-browser-slot={self.token_id}',
])
if sys.platform.startswith("win"):
browser_args.append('--window-position=-32000,-32000')
debug_logger.log_info(
f"[BrowserCaptcha] Token-{self.token_id} 有头浏览器将以后台模式启动"
f"[BrowserCaptcha] Token-{self.token_id} ?????????????"
)
if browser_executable_path:
debug_logger.log_info(
f"[BrowserCaptcha] Token-{self.token_id} ????????????: {browser_executable_path}"
)
browser = await playwright.chromium.launch(
headless=False,
executable_path=browser_executable_path,
proxy=proxy_option,
args=browser_args
args=browser_args,
)
context = await browser.new_context(
user_agent=random_ua,
viewport=viewport,
)
browser_pid = self._extract_browser_pid(browser)
if manage_slot_pid:
self._write_pid_file(browser_pid)
debug_logger.log_info(
f"[BrowserCaptcha] Token-{self.token_id} ????????? (proxy={'yes' if raw_proxy_url else 'no'})"
)
return playwright, browser, context
except Exception as e:
debug_logger.log_error(f"[BrowserCaptcha] Token-{self.token_id} 启动浏览器失败: {type(e).__name__}: {str(e)[:200]}")
# 确保清理已创建的对象
debug_logger.log_error(f"[BrowserCaptcha] Token-{self.token_id} ???????: {type(e).__name__}: {str(e)[:200]}")
try:
if playwright:
await playwright.stop()
except: pass
except Exception:
pass
if manage_slot_pid:
self._write_pid_file(None)
raise
async def _recycle_browser_locked(self, reason: str = "unknown", rotate_profile: bool = True):
"""???????????????????"""
playwright = self._shared_playwright
browser = self._shared_browser
context = self._shared_context
keepalive_page = self._shared_keepalive_page
browser_pid = self._shared_browser_pid or self._read_pid_file()
had_browser = bool(playwright or browser or context or keepalive_page or browser_pid)
self._shared_playwright = None
self._shared_browser = None
self._shared_context = None
self._shared_keepalive_page = None
self._shared_browser_pid = None
self._shared_proxy_url = None
self._consecutive_browser_failures = 0
self._shared_reuse_count = 0
if rotate_profile:
self._refresh_browser_profile()
if had_browser:
debug_logger.log_info(
f"[BrowserCaptcha] Token-{self.token_id} ????????reason={reason}"
)
await self._close_browser(playwright, browser, context, browser_pid=browser_pid)
async def recycle_browser(self, reason: str = "unknown", rotate_profile: bool = True):
"""????????????"""
async with self._shared_browser_lock:
await self._recycle_browser_locked(reason=reason, rotate_profile=rotate_profile)
async def _get_or_create_shared_browser(self, token_proxy_url: Optional[str] = None) -> tuple:
"""????????????????????????????"""
_, expected_proxy_url, _ = await self._resolve_proxy_runtime_config(token_proxy_url=token_proxy_url)
async with self._shared_browser_lock:
has_shared_browser = bool(self._shared_playwright and self._shared_browser and self._shared_context)
if has_shared_browser:
is_connected = True
try:
checker = getattr(self._shared_browser, "is_connected", None)
if callable(checker):
is_connected = bool(checker())
except Exception:
is_connected = False
if not is_connected:
await self._recycle_browser_locked(reason="browser_disconnected", rotate_profile=False)
has_shared_browser = False
if has_shared_browser and self._shared_proxy_url != expected_proxy_url:
# ????????????????????????????????????
await self._recycle_browser_locked(reason="proxy_changed", rotate_profile=False)
has_shared_browser = False
if has_shared_browser:
try:
await self._ensure_shared_keepalive_page()
except Exception:
await self._recycle_browser_locked(reason="keepalive_page_broken", rotate_profile=False)
has_shared_browser = False
if has_shared_browser:
self._shared_reuse_count += 1
debug_logger.log_info(
f"[BrowserCaptcha] Token-{self.token_id} ??????? (reuse={self._shared_reuse_count})"
)
return self._shared_playwright, self._shared_browser, self._shared_context
playwright, browser, context = await self._create_browser(token_proxy_url=token_proxy_url)
self._shared_playwright = playwright
self._shared_browser = browser
self._shared_context = context
await self._ensure_shared_keepalive_page()
self._shared_proxy_url = (self._last_fingerprint or {}).get("proxy_url")
self._shared_launch_count += 1
self._shared_reuse_count = 0
return playwright, browser, context
async def _capture_page_fingerprint(self, page):
"""从浏览器页面提取 UA 与客户端提示头,确保与打码浏览器一致。"""
try:
@@ -638,20 +924,50 @@ class TokenBrowser:
},
}
async def _close_browser(self, playwright, browser, context):
"""关闭浏览器实例"""
async def _close_browser(
self,
playwright,
browser,
context,
browser_pid: Optional[int] = None,
clear_slot_pid: bool = True,
):
"""??????????????????????? PID ?????"""
is_shared_browser = any([
context is not None and context is self._shared_context,
browser is not None and browser is self._shared_browser,
playwright is not None and playwright is self._shared_playwright,
])
effective_pid = browser_pid or self._extract_browser_pid(browser)
if clear_slot_pid and not effective_pid:
effective_pid = self._shared_browser_pid or self._read_pid_file()
if is_shared_browser:
self._shared_playwright = None
self._shared_browser = None
self._shared_context = None
self._shared_keepalive_page = None
self._shared_browser_pid = None
self._shared_proxy_url = None
try:
if context:
await context.close()
except: pass
await asyncio.wait_for(context.close(), timeout=10)
except Exception:
pass
try:
if browser:
await browser.close()
except: pass
await asyncio.wait_for(browser.close(), timeout=10)
except Exception:
pass
try:
if playwright:
await playwright.stop()
except: pass
await asyncio.wait_for(playwright.stop(), timeout=10)
except Exception:
pass
if effective_pid and not await self._wait_pid_exit(effective_pid, timeout_seconds=4):
self._kill_pid(effective_pid, reason='close_timeout_or_orphan')
await self._wait_pid_exit(effective_pid, timeout_seconds=2)
if clear_slot_pid:
self._write_pid_file(None)
async def _wait_and_close_after_request(
self,
@@ -748,7 +1064,7 @@ class TokenBrowser:
)
async def force_close_pending_browser(self, request_ref: Optional[str] = None, close_all: bool = False):
"""强制关闭待释放浏览器(服务关闭时调用)。"""
"""????????????????????"""
async with self._pending_release_lock:
entries: List[Dict[str, Any]] = []
if close_all:
@@ -758,7 +1074,6 @@ class TokenBrowser:
entry = self._pending_release_entries.pop(request_ref)
entries = [entry]
elif self._pending_release_entries:
# 兼容旧调用方(无 request_ref仅关闭最早的一项避免误伤其它并发请求。
first_ref = next(iter(self._pending_release_entries.keys()))
entry = self._pending_release_entries.pop(first_ref)
entries = [entry]
@@ -778,7 +1093,10 @@ class TokenBrowser:
await asyncio.wait_for(release_task, timeout=5)
except Exception:
pass
if close_all:
await self.recycle_browser(reason="force_close_all", rotate_profile=False)
async def _execute_captcha(self, context, project_id: str, website_key: str, action: str) -> Optional[str]:
"""在给定 context 中执行打码逻辑"""
page = None
@@ -1122,52 +1440,52 @@ class TokenBrowser:
action: str = "IMAGE_GENERATION",
token_proxy_url: Optional[str] = None
) -> tuple[Optional[str], Optional[str]]:
"""获取 Token:启动新浏览器 -> 打码 -> 关闭浏览器"""
"""?? Token??????????????? fatal ??????"""
async with self._semaphore:
MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
playwright = None
browser = None
context = None
max_retries = 3
for attempt in range(max_retries):
try:
start_ts = time.time()
# 每次都启动新浏览器(新 UA
playwright, browser, context = await self._create_browser(token_proxy_url=token_proxy_url)
# 执行打码
_, _, context = await self._get_or_create_shared_browser(token_proxy_url=token_proxy_url)
token = await self._execute_captcha(context, project_id, website_key, action)
if token:
self._solve_count += 1
debug_logger.log_info(f"[BrowserCaptcha] Token-{self.token_id} 获取成功 ({(time.time()-start_ts)*1000:.0f}ms)")
# 不立即关闭浏览器:等待图片/视频请求结束后再关闭
request_ref = await self._defer_browser_close_until_request_done(
playwright=playwright,
browser=browser,
context=context,
action=action,
self._consecutive_browser_failures = 0
debug_logger.log_info(
f"[BrowserCaptcha] Token-{self.token_id} ???? ({(time.time()-start_ts)*1000:.0f}ms, launches={self._shared_launch_count}, reuse={self._shared_reuse_count})"
)
playwright = None
browser = None
context = None
return token, request_ref
return token, None
self._error_count += 1
debug_logger.log_warning(f"[BrowserCaptcha] Token-{self.token_id} 尝试 {attempt+1}/{MAX_RETRIES} 失败")
self._consecutive_browser_failures += 1
debug_logger.log_warning(
f"[BrowserCaptcha] Token-{self.token_id} ??????? {attempt + 1}/{max_retries} ??"
)
if self._consecutive_browser_failures >= 2:
await self.recycle_browser(reason=f"captcha_failed_{attempt + 1}", rotate_profile=False)
except Exception as e:
self._error_count += 1
debug_logger.log_error(f"[BrowserCaptcha] Token-{self.token_id} 浏览器错误: {type(e).__name__}: {str(e)[:200]}")
finally:
# 无论成功失败都关闭浏览器
await self._close_browser(playwright, browser, context)
# 重试前等待
if attempt < MAX_RETRIES - 1:
self._consecutive_browser_failures += 1
error_message = f"{type(e).__name__}: {str(e)}"
debug_logger.log_error(
f"[BrowserCaptcha] Token-{self.token_id} ???????: {error_message[:200]}"
)
error_lower = error_message.lower()
if any(keyword in error_lower for keyword in [
"context or browser has been closed",
"target closed",
"browser has been closed",
"connection closed",
"crash",
"closed",
]):
await self.recycle_browser(reason="browser_runtime_error", rotate_profile=False)
if attempt < max_retries - 1:
await asyncio.sleep(1)
return None, None
async def get_custom_token(
@@ -1187,7 +1505,7 @@ class TokenBrowser:
context = None
try:
start_ts = time.time()
playwright, browser, context = await self._create_browser()
playwright, browser, context = await self._create_browser(manage_slot_pid=False)
token = await self._execute_custom_captcha(
context=context,
website_url=website_url,
@@ -1213,7 +1531,13 @@ class TokenBrowser:
f"[BrowserCaptcha] Token-{self.token_id} 自定义浏览器错误: {type(e).__name__}: {str(e)[:200]}"
)
finally:
await self._close_browser(playwright, browser, context)
await self._close_browser(
playwright,
browser,
context,
browser_pid=self._extract_browser_pid(browser),
clear_slot_pid=False,
)
if attempt < max_retries - 1:
await asyncio.sleep(1)
@@ -1238,7 +1562,7 @@ class TokenBrowser:
context = None
try:
started_at = time.time()
playwright, browser, context = await self._create_browser()
playwright, browser, context = await self._create_browser(manage_slot_pid=False)
payload = await self._execute_custom_captcha(
context=context,
website_url=website_url,
@@ -1266,7 +1590,13 @@ class TokenBrowser:
f"[BrowserCaptcha] Token-{self.token_id} 页面内分数校验异常: {type(e).__name__}: {str(e)[:200]}"
)
finally:
await self._close_browser(playwright, browser, context)
await self._close_browser(
playwright,
browser,
context,
browser_pid=self._extract_browser_pid(browser),
clear_slot_pid=False,
)
if attempt < max_retries - 1:
await asyncio.sleep(1)
@@ -1359,12 +1689,20 @@ class BrowserCaptchaService:
await self._load_browser_count()
# 如果数量减少,移除多余的浏览器实例
browsers_to_close: List[TokenBrowser] = []
if self._browser_count < old_count:
async with self._browsers_lock:
for browser_id in list(self._browsers.keys()):
if browser_id >= self._browser_count:
self._browsers.pop(browser_id)
browsers_to_close.append(self._browsers.pop(browser_id))
debug_logger.log_info(f"[BrowserCaptcha] 移除多余浏览器实例 {browser_id}")
for browser in browsers_to_close:
try:
await browser.force_close_pending_browser(close_all=True)
await browser.recycle_browser(reason="browser_slot_removed", rotate_profile=False)
except Exception as e:
debug_logger.log_warning(f"[BrowserCaptcha] 缩容关闭浏览器实例失败: {e}")
def _log_stats(self):
total = self._stats["req_total"]
@@ -1570,36 +1908,37 @@ class BrowserCaptchaService:
return browser.get_last_fingerprint()
async def report_error(self, browser_ref: Optional[Union[int, str]] = None, error_reason: Optional[str] = None):
"""上层举报当前请求失败,必要时提前回收待释放浏览器。
Args:
browser_ref: 浏览器请求句柄browser_id[:request_ref]
"""
browser_id, request_ref = self._parse_browser_ref(browser_ref)
"""????????????????? reCAPTCHA evaluation failed ????????"""
browser_id, _ = self._parse_browser_ref(browser_ref)
async with self._browsers_lock:
browser = self._browsers.get(browser_id) if browser_id is not None else None
error_lower = (error_reason or "").lower()
if "403" in error_lower or "recaptcha" in error_lower:
has_recaptcha = "recaptcha" in error_lower
should_recycle = has_recaptcha and (
"evaluation failed" in error_lower
or "????" in error_lower
or "failed" in error_lower
)
if should_recycle:
self._stats["api_403"] += 1
if browser_id is not None:
debug_logger.log_info(
f"[BrowserCaptcha] 浏览器 {browser_id} 的 token 验证失败,reason={error_reason or 'unknown'}"
f"[BrowserCaptcha] ??? {browser_id} ????????reason={error_reason or 'unknown'}, recycle={should_recycle}"
)
if browser:
if browser and should_recycle:
try:
if request_ref:
await browser.force_close_pending_browser(request_ref=request_ref)
else:
# 未携带 request_ref 时只回收一项,避免高并发下误关其它请求链路。
await browser.force_close_pending_browser()
await browser.recycle_browser(
reason=error_reason or "recaptcha_evaluation_failed",
rotate_profile=True,
)
except Exception as e:
debug_logger.log_warning(f"[BrowserCaptcha] 浏览器 {browser_id} 失败后提前关闭异常: {e}")
debug_logger.log_warning(f"[BrowserCaptcha] ??? {browser_id} ????: {e}")
async def report_request_finished(self, browser_ref: Optional[Union[int, str]] = None):
"""上层通知:图片/视频请求已完成,可关闭对应打码浏览器"""
browser_id, request_ref = self._parse_browser_ref(browser_ref)
"""上层通知本次请求已完成browser 模式仅保留常驻浏览器,不在成功后主动关闭"""
browser_id, _ = self._parse_browser_ref(browser_ref)
if browser_id is None:
return
@@ -1607,7 +1946,15 @@ class BrowserCaptchaService:
browser = self._browsers.get(browser_id)
if browser:
await browser.notify_generation_request_finished(request_ref=request_ref)
keepalive_alive = False
keepalive_page = getattr(browser, '_shared_keepalive_page', None)
try:
keepalive_alive = bool(keepalive_page and not keepalive_page.is_closed())
except Exception:
keepalive_alive = False
debug_logger.log_info(
f"[BrowserCaptcha] ??? {browser_id} ???????????????????? keepalive_alive={keepalive_alive}"
)
async def remove_browser(self, browser_id: int):
async with self._browsers_lock:
@@ -1622,6 +1969,7 @@ class BrowserCaptchaService:
for browser in browsers:
try:
await browser.force_close_pending_browser(close_all=True)
await browser.recycle_browser(reason="service_shutdown", rotate_profile=False)
except Exception:
pass

View File

@@ -1795,16 +1795,17 @@ class FlowClient:
"""统一处理生成链路的重试判定与打码自愈通知。"""
error_str = str(error)
retry_reason = self._get_retry_reason(error_str)
notify_reason = retry_reason or error_str[:120] or type(error).__name__
await self._notify_browser_captcha_error(
browser_id=browser_id,
project_id=project_id,
error_reason=notify_reason,
error_message=error_str,
)
if not retry_reason:
return False
is_terminal_attempt = retry_attempt >= max_retries - 1
await self._notify_browser_captcha_error(
browser_id=browser_id,
project_id=project_id,
error_reason=retry_reason,
error_message=error_str,
)
if is_terminal_attempt:
debug_logger.log_warning(
@@ -1879,7 +1880,10 @@ class FlowClient:
try:
from .browser_captcha import BrowserCaptchaService
service = await BrowserCaptchaService.get_instance(self.db)
await service.report_error(browser_id, error_reason=error_reason)
await service.report_error(
browser_id,
error_reason=error_reason or error_message or "upstream_error"
)
except Exception:
pass
elif config.captcha_method == "personal" and project_id: