From 36f8cfc54c032be887b47adeb0dc02cf6f97970b Mon Sep 17 00:00:00 2001 From: genz27 Date: Fri, 17 Apr 2026 15:21:15 +0800 Subject: [PATCH] =?UTF-8?q?fix(browser):=20=E4=BF=AE=E5=A4=8D=E5=86=85?= =?UTF-8?q?=E7=BD=AE=E6=9C=89=E5=A4=B4=E6=B5=8F=E8=A7=88=E5=99=A8=E4=B8=8E?= =?UTF-8?q?=E5=B9=B6=E5=8F=91=E7=AB=AF=E5=8F=A3=E5=8D=A0=E6=BB=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile.headed | 10 + README.md | 1 + config/setting_example.toml | 1 + docker-compose.headed.yml | 3 + docker/entrypoint.headed.sh | 31 +- src/main.py | 6 + src/services/browser_captcha_personal.py | 46 ++- src/services/flow_client.py | 422 +++++++++++++++++------ tests/testflow_client_launch_gate.py | 68 ++++ 9 files changed, 480 insertions(+), 108 deletions(-) create mode 100644 tests/testflow_client_launch_gate.py diff --git a/Dockerfile.headed b/Dockerfile.headed index 6b0f25f..adfa4b5 100644 --- a/Dockerfile.headed +++ b/Dockerfile.headed @@ -5,8 +5,18 @@ WORKDIR /app ENV PYTHONDONTWRITEBYTECODE=1 \ PYTHONUNBUFFERED=1 \ ALLOW_DOCKER_HEADED_CAPTCHA=true \ + DISPLAY=:99 \ + PERSONAL_BROWSER_HEADLESS=0 \ PLAYWRIGHT_BROWSERS_PATH=0 +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + fluxbox \ + x11-utils \ + xauth \ + xvfb \ + && rm -rf /var/lib/apt/lists/* + # 在镜像构建阶段预装 Playwright Chromium,供 personal/browser 模式复用 COPY requirements.txt ./ diff --git a/README.md b/README.md index acfd382..98be886 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ docker-compose -f docker-compose.warp.yml logs -f > 适用于你有虚拟化桌面需求、希望在容器里启用有头浏览器打码的场景。 > 该模式默认启动 `Xvfb + Fluxbox` 实现容器内部可视化,并设置 `ALLOW_DOCKER_HEADED_CAPTCHA=true`。 > 仅开放应用端口,不提供任何远程桌面连接端口。 +> `personal` 内置浏览器现在默认按有头模式启动;如需临时切回无头,可额外设置环境变量 `PERSONAL_BROWSER_HEADLESS=true`。 ```bash # 启动有头模式(首次建议带 --build) diff --git a/config/setting_example.toml b/config/setting_example.toml index d284aae..0d4f1b7 100644 --- a/config/setting_example.toml +++ b/config/setting_example.toml @@ -55,6 +55,7 @@ base_url = "" # 缓存文件访问的基础URL, 留空则使用服务器地址 [captcha] captcha_method = "browser" # 打码方式: yescaptcha/browser/personal/remote_browser +browser_launch_background = true # 有头浏览器是否默认后台启动;设为 false 可直接看到窗口 browser_recaptcha_settle_seconds = 3.0 # reload/clr 就绪后的额外稳态等待 browser_count = 1 # browser 模式的有头浏览器实例数量 personal_project_pool_size = 4 # personal 模式下单个 Token 默认维护的项目池数量(仅影响项目轮换,不决定打码标签页数量) diff --git a/docker-compose.headed.yml b/docker-compose.headed.yml index 67f6a7c..30a16d4 100644 --- a/docker-compose.headed.yml +++ b/docker-compose.headed.yml @@ -2,6 +2,7 @@ version: '3.8' services: flow2api-headed: + init: true build: context: . dockerfile: Dockerfile.headed @@ -16,5 +17,7 @@ services: environment: - PYTHONUNBUFFERED=1 - ALLOW_DOCKER_HEADED_CAPTCHA=true + - DISPLAY=:99 + - XVFB_SCREEN=1440x900x24 shm_size: "2gb" restart: unless-stopped diff --git a/docker/entrypoint.headed.sh b/docker/entrypoint.headed.sh index aa6518a..dde159e 100644 --- a/docker/entrypoint.headed.sh +++ b/docker/entrypoint.headed.sh @@ -1,6 +1,10 @@ #!/bin/sh set -eu +DISPLAY_VALUE="${DISPLAY:-:99}" +XVFB_SCREEN_VALUE="${XVFB_SCREEN:-1440x900x24}" +export DISPLAY="${DISPLAY_VALUE}" + resolve_browser_path() { python - <<'PY' from playwright.sync_api import sync_playwright @@ -17,7 +21,32 @@ if [ -z "${BROWSER_EXECUTABLE_PATH:-}" ] || [ ! -x "${BROWSER_EXECUTABLE_PATH:-} fi fi -echo "[entrypoint] starting flow2api (headless browser mode)" +if [ "${ALLOW_DOCKER_HEADED_CAPTCHA:-true}" = "true" ] || [ "${ALLOW_DOCKER_HEADED_CAPTCHA:-1}" = "1" ]; then + display_suffix="$(printf '%s' "${DISPLAY}" | sed 's/^://; s/\..*$//')" + socket_path="/tmp/.X11-unix/X${display_suffix}" + + mkdir -p /tmp/.X11-unix + rm -f "/tmp/.X${display_suffix}-lock" + + echo "[entrypoint] starting Xvfb on DISPLAY=${DISPLAY} (${XVFB_SCREEN_VALUE})" + Xvfb "${DISPLAY}" -screen 0 "${XVFB_SCREEN_VALUE}" -ac +extension RANDR >/tmp/xvfb.log 2>&1 & + + waited=0 + while [ ! -S "${socket_path}" ] && [ "${waited}" -lt 100 ]; do + sleep 0.1 + waited=$((waited + 1)) + done + + if [ ! -S "${socket_path}" ]; then + echo "[entrypoint] failed to start Xvfb, socket not ready: ${socket_path}" >&2 + exit 1 + fi + + echo "[entrypoint] starting Fluxbox on DISPLAY=${DISPLAY}" + fluxbox >/tmp/fluxbox.log 2>&1 & +fi + +echo "[entrypoint] starting flow2api (headed browser mode)" if [ -n "${BROWSER_EXECUTABLE_PATH:-}" ] && [ -x "${BROWSER_EXECUTABLE_PATH}" ]; then echo "[entrypoint] browser executable: ${BROWSER_EXECUTABLE_PATH}" "${BROWSER_EXECUTABLE_PATH}" --version || true diff --git a/src/main.py b/src/main.py index b46ceb9..241f0b2 100644 --- a/src/main.py +++ b/src/main.py @@ -144,6 +144,12 @@ async def lifespan(app: FastAPI): await auto_unban_task_handle except asyncio.CancelledError: pass + # Close shared HTTP session pools + try: + await flow_client.close() + print("✓ Flow client HTTP session closed") + except Exception as e: + print(f"⚠ Flow client close failed: {e}") # Close browser if initialized if browser_service: await browser_service.close() diff --git a/src/services/browser_captcha_personal.py b/src/services/browser_captcha_personal.py index 80e6751..f02dfbb 100644 --- a/src/services/browser_captcha_personal.py +++ b/src/services/browser_captcha_personal.py @@ -52,6 +52,20 @@ def _is_truthy_env(name: str) -> bool: return value.strip().lower() in {"1", "true", "yes", "on"} +def _get_optional_bool_env(name: str) -> Optional[bool]: + """读取可选布尔环境变量,未设置或无法识别时返回 None。""" + value = os.environ.get(name) + if value is None: + return None + + normalized = value.strip().lower() + if normalized in {"1", "true", "yes", "on"}: + return True + if normalized in {"0", "false", "no", "off"}: + return False + return None + + ALLOW_DOCKER_HEADED = ( _is_truthy_env("ALLOW_DOCKER_HEADED_CAPTCHA") or _is_truthy_env("ALLOW_DOCKER_BROWSER_CAPTCHA") @@ -555,7 +569,7 @@ class BrowserCaptchaService: def __init__(self, db=None): """初始化服务""" - self.headless = True # 无头模式 + self.headless = self._resolve_headless_mode() # 默认改为有头,可用环境变量回退到无头 self.browser = None self._initialized = False self.website_key = "6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV" @@ -640,6 +654,18 @@ class BrowserCaptchaService: f"fingerprint_ttl {old_fingerprint_ttl}s->{self._fingerprint_cache_ttl_seconds}s" ) + def _resolve_headless_mode(self) -> bool: + """personal 模式默认改为有头,仅在显式环境变量要求时回退到无头。""" + for env_name in ("PERSONAL_BROWSER_HEADLESS", "FLOW2API_PERSONAL_HEADLESS"): + override = _get_optional_bool_env(env_name) + if override is not None: + debug_logger.log_info( + f"[BrowserCaptcha] Personal headless 模式由环境变量 {env_name} 控制: {override}" + ) + return override + + return False + def _refresh_runtime_tunables(self): """刷新运行时调优参数,缺省时使用保守的低开销默认值。""" try: @@ -1454,6 +1480,7 @@ class BrowserCaptchaService: self._proxy_url = f"{protocol}://{host}:{port}" debug_logger.log_info(f"[BrowserCaptcha] Personal 浏览器代理: {self._proxy_url}") + launch_in_background = bool(getattr(config, "browser_launch_background", True)) browser_args = [ '--disable-quic', '--disable-features=UseDnsHttpsSvcb', @@ -1463,7 +1490,6 @@ class BrowserCaptchaService: '--disable-infobars', '--hide-scrollbars', '--window-size=1280,720', - '--window-position=3000,3000', '--profile-directory=Default', '--disable-background-networking', '--disable-sync', @@ -1473,6 +1499,20 @@ class BrowserCaptchaService: '--no-default-browser-check', '--no-zygote', ] + if launch_in_background and not self.headless: + browser_args.extend([ + '--start-minimized', + '--disable-background-timer-throttling', + '--disable-renderer-backgrounding', + '--disable-backgrounding-occluded-windows', + ]) + if sys.platform.startswith("win"): + browser_args.append('--window-position=-32000,-32000') + else: + browser_args.append('--window-position=3000,3000') + debug_logger.log_info("[BrowserCaptcha] Personal 有头浏览器将以后台模式启动") + elif not self.headless: + debug_logger.log_info("[BrowserCaptcha] Personal 有头浏览器将以可见窗口模式启动") if proxy_server_arg: browser_args.append(proxy_server_arg) if self._proxy_ext_dir: @@ -1503,7 +1543,7 @@ class BrowserCaptchaService: debug_logger.log_info( "[BrowserCaptcha] nodriver 启动上下文: " f"docker={IS_DOCKER}, display={display_value or ''}, " - f"uid={effective_uid}, headless={self.headless}, sandbox=False, " + f"uid={effective_uid}, headless={self.headless}, background={launch_in_background}, sandbox=False, " f"executable={browser_executable_path or ''}, " f"args={' '.join(effective_launch_args)}" ) diff --git a/src/services/flow_client.py b/src/services/flow_client.py index dae88bc..2b6c297 100644 --- a/src/services/flow_client.py +++ b/src/services/flow_client.py @@ -38,6 +38,17 @@ class FlowClient: default=None ) self._remote_browser_prefill_last_sent: Dict[str, float] = {} + self._http_session: Optional[AsyncSession] = None + self._http_session_loop: Optional[asyncio.AbstractEventLoop] = None + self._http_session_lock = asyncio.Lock() + self._image_launch_condition = asyncio.Condition() + self._video_launch_condition = asyncio.Condition() + self._image_launch_inflight = 0 + self._video_launch_inflight = 0 + self._image_launch_stagger_lock = asyncio.Lock() + self._video_launch_stagger_lock = asyncio.Lock() + self._image_launch_next_at = 0.0 + self._video_launch_next_at = 0.0 # Default "real browser" headers (Android Chrome style) to reduce upstream 4xx/5xx instability. # These will be applied as defaults (won't override caller-provided headers). @@ -252,65 +263,65 @@ class FlowClient: start_time = time.time() try: - async with AsyncSession() as session: - if method.upper() == "GET": - response = await session.get( - url, - headers=headers, - proxy=proxy_url, - timeout=request_timeout, - impersonate="chrome110" - ) - else: # POST - response = await session.post( - url, - headers=headers, - json=json_data, - proxy=proxy_url, - timeout=request_timeout, - impersonate="chrome110" - ) + session = await self._get_http_session() + if method.upper() == "GET": + response = await session.get( + url, + headers=headers, + proxy=proxy_url, + timeout=request_timeout, + impersonate="chrome110" + ) + else: # POST + response = await session.post( + url, + headers=headers, + json=json_data, + proxy=proxy_url, + timeout=request_timeout, + impersonate="chrome110" + ) - duration_ms = (time.time() - start_time) * 1000 + duration_ms = (time.time() - start_time) * 1000 - # Log response - if config.debug_enabled: - debug_logger.log_response( - status_code=response.status_code, - headers=dict(response.headers), - body=response.text, - duration_ms=duration_ms - ) + # Log response + if config.debug_enabled: + debug_logger.log_response( + status_code=response.status_code, + headers=dict(response.headers), + body=response.text, + duration_ms=duration_ms + ) - # 检查HTTP错误 - if response.status_code >= 400: - # 解析错误响应 - error_reason = f"HTTP Error {response.status_code}" - try: - error_body = response.json() - # 提取 Google API 错误格式中的 reason - if "error" in error_body: - error_info = error_body["error"] - error_message = error_info.get("message", "") - # 从 details 中提取 reason - details = error_info.get("details", []) - for detail in details: - if detail.get("reason"): - error_reason = detail.get("reason") - break - if error_message: - error_reason = f"{error_reason}: {error_message}" - except: - error_reason = f"HTTP Error {response.status_code}: {response.text[:200]}" - - # 失败时输出请求体和错误内容到控制台 - debug_logger.log_error(f"[API FAILED] URL: {url}") - debug_logger.log_error(f"[API FAILED] Request Body: {json_data}") - debug_logger.log_error(f"[API FAILED] Response: {response.text}") - - raise Exception(error_reason) + # 检查HTTP错误 + if response.status_code >= 400: + # 解析错误响应 + error_reason = f"HTTP Error {response.status_code}" + try: + error_body = response.json() + # 提取 Google API 错误格式中的 reason + if "error" in error_body: + error_info = error_body["error"] + error_message = error_info.get("message", "") + # 从 details 中提取 reason + details = error_info.get("details", []) + for detail in details: + if detail.get("reason"): + error_reason = detail.get("reason") + break + if error_message: + error_reason = f"{error_reason}: {error_message}" + except Exception: + error_reason = f"HTTP Error {response.status_code}: {response.text[:200]}" - return response.json() + # 失败时输出请求体和错误内容到控制台 + debug_logger.log_error(f"[API FAILED] URL: {url}") + debug_logger.log_error(f"[API FAILED] Request Body: {json_data}") + debug_logger.log_error(f"[API FAILED] Response: {response.text}") + + raise Exception(error_reason) + + return response.json() except Exception as e: duration_ms = (time.time() - start_time) * 1000 @@ -462,29 +473,232 @@ class FlowClient: """控制轻量控制面请求的超时,避免认证/项目接口长时间挂起。""" return max(5, min(int(self.timeout or 0) or 120, 10)) + def _get_http_pool_size(self) -> int: + """根据软并发配置决定复用连接池大小,避免瞬时创建过多 socket。""" + configured_limits: list[int] = [] + for raw_value in ( + getattr(config, "flow_image_launch_soft_limit", 0), + getattr(config, "flow_video_launch_soft_limit", 0), + ): + try: + configured_limits.append(max(0, int(raw_value or 0))) + except Exception: + configured_limits.append(0) + + peak_limit = max(configured_limits + [10]) + return max(10, min(64, peak_limit)) + + async def _get_http_session(self) -> AsyncSession: + """复用上游 HTTP 会话,减少 Docker 内高并发时的临时端口占用。""" + current_loop = asyncio.get_running_loop() + if self._http_session is not None and self._http_session_loop is current_loop: + return self._http_session + + async with self._http_session_lock: + if self._http_session is not None and self._http_session_loop is not current_loop: + try: + await self._http_session.close() + except Exception: + pass + self._http_session = None + self._http_session_loop = None + + if self._http_session is None: + max_clients = self._get_http_pool_size() + self._http_session = AsyncSession(max_clients=max_clients) + self._http_session_loop = current_loop + debug_logger.log_info( + f"[HTTP] 已创建复用 AsyncSession (max_clients={max_clients})" + ) + + return self._http_session + + async def close(self): + """关闭复用 HTTP 会话。""" + async with self._http_session_lock: + session = self._http_session + self._http_session = None + self._http_session_loop = None + + if session is not None: + try: + await session.close() + except Exception as e: + debug_logger.log_warning(f"[HTTP] 关闭复用 AsyncSession 失败: {e}") + + async def _apply_launch_stagger(self, generation_type: str, stagger_ms: int) -> int: + """进入打码前做轻量错峰,减少同一时刻批量占满端口。""" + try: + normalized_stagger_ms = max(0, int(stagger_ms or 0)) + except Exception: + normalized_stagger_ms = 0 + + if normalized_stagger_ms <= 0: + return 0 + + interval_seconds = normalized_stagger_ms / 1000.0 + lock = ( + self._image_launch_stagger_lock + if generation_type == "image" + else self._video_launch_stagger_lock + ) + next_attr = ( + "_image_launch_next_at" + if generation_type == "image" + else "_video_launch_next_at" + ) + + async with lock: + now = time.monotonic() + next_available_at = float(getattr(self, next_attr, 0.0) or 0.0) + scheduled_at = max(now, next_available_at) + setattr(self, next_attr, scheduled_at + interval_seconds) + + wait_seconds = max(0.0, scheduled_at - time.monotonic()) + if wait_seconds > 0: + await asyncio.sleep(wait_seconds) + return int(wait_seconds * 1000) + + async def _acquire_launch_gate( + self, + *, + generation_type: str, + token_id: Optional[int], + soft_limit: int, + wait_timeout_seconds: float, + stagger_ms: int, + ) -> tuple[bool, int, int]: + """控制进入打码阶段的软并发,降低 Docker 内短时端口/线程压力。""" + try: + normalized_limit = max(0, int(soft_limit or 0)) + except Exception: + normalized_limit = 0 + + if normalized_limit <= 0: + return True, 0, 0 + + wait_started_at = time.monotonic() + deadline = wait_started_at + max(1.0, float(wait_timeout_seconds or 0)) + condition = ( + self._image_launch_condition + if generation_type == "image" + else self._video_launch_condition + ) + inflight_attr = ( + "_image_launch_inflight" + if generation_type == "image" + else "_video_launch_inflight" + ) + current_inflight = 0 + + while True: + async with condition: + current_inflight = int(getattr(self, inflight_attr, 0) or 0) + if current_inflight < normalized_limit: + current_inflight += 1 + setattr(self, inflight_attr, current_inflight) + break + + remaining_seconds = deadline - time.monotonic() + if remaining_seconds <= 0: + waited_ms = int((time.monotonic() - wait_started_at) * 1000) + debug_logger.log_warning( + f"[LAUNCH_GATE] {generation_type} 等待超时 " + f"(token={token_id}, inflight={current_inflight}/{normalized_limit}, waited={waited_ms}ms)" + ) + return False, waited_ms, 0 + + try: + await asyncio.wait_for(condition.wait(), timeout=remaining_seconds) + except asyncio.TimeoutError: + waited_ms = int((time.monotonic() - wait_started_at) * 1000) + debug_logger.log_warning( + f"[LAUNCH_GATE] {generation_type} 等待超时 " + f"(token={token_id}, inflight={current_inflight}/{normalized_limit}, waited={waited_ms}ms)" + ) + return False, waited_ms, 0 + + waited_ms = int((time.monotonic() - wait_started_at) * 1000) + try: + stagger_wait_ms = await self._apply_launch_stagger( + generation_type=generation_type, + stagger_ms=stagger_ms, + ) + except Exception: + await self._release_launch_gate(generation_type=generation_type, token_id=token_id) + raise + + debug_logger.log_info( + f"[LAUNCH_GATE] {generation_type} 已放行 " + f"(token={token_id}, inflight={current_inflight}/{normalized_limit}, " + f"queue_wait={waited_ms}ms, stagger_wait={stagger_wait_ms}ms)" + ) + return True, waited_ms, stagger_wait_ms + + async def _release_launch_gate(self, *, generation_type: str, token_id: Optional[int]): + condition = ( + self._image_launch_condition + if generation_type == "image" + else self._video_launch_condition + ) + inflight_attr = ( + "_image_launch_inflight" + if generation_type == "image" + else "_video_launch_inflight" + ) + + async with condition: + current_inflight = int(getattr(self, inflight_attr, 0) or 0) + if current_inflight <= 0: + setattr(self, inflight_attr, 0) + debug_logger.log_warning( + f"[LAUNCH_GATE] {generation_type} release 时 inflight 已为 0 (token={token_id})" + ) + return + + new_inflight = current_inflight - 1 + setattr(self, inflight_attr, new_inflight) + condition.notify(1) + + debug_logger.log_info( + f"[LAUNCH_GATE] {generation_type} 已释放 (token={token_id}, inflight={new_inflight})" + ) + async def _acquire_image_launch_gate( self, token_id: Optional[int], token_image_concurrency: Optional[int], ) -> tuple[bool, int, int]: - """图片请求不再做本地发车排队,直接进入取 token 并提交上游。""" - return True, 0, 0 + """图片请求进入打码前的软并发整形。""" + return await self._acquire_launch_gate( + generation_type="image", + token_id=token_id, + soft_limit=getattr(config, "flow_image_launch_soft_limit", 0), + wait_timeout_seconds=getattr(config, "flow_image_launch_wait_timeout", 180), + stagger_ms=getattr(config, "flow_image_launch_stagger_ms", 0), + ) async def _release_image_launch_gate(self, token_id: Optional[int]): - """保留接口形状,当前无需释放任何本地发车状态。""" - return + """释放图片发车软并发槽位。""" + await self._release_launch_gate(generation_type="image", token_id=token_id) async def _acquire_video_launch_gate( self, token_id: Optional[int], token_video_concurrency: Optional[int], ) -> tuple[bool, int, int]: - """视频请求不再做本地发车排队,直接进入取 token 并提交上游。""" - return True, 0, 0 + """视频请求进入打码前的软并发整形。""" + return await self._acquire_launch_gate( + generation_type="video", + token_id=token_id, + soft_limit=getattr(config, "flow_video_launch_soft_limit", 0), + wait_timeout_seconds=getattr(config, "flow_video_launch_wait_timeout", 180), + stagger_ms=getattr(config, "flow_video_launch_stagger_ms", 0), + ) async def _release_video_launch_gate(self, token_id: Optional[int]): - """保留接口形状,当前无需释放任何本地发车状态。""" - return + """释放视频发车软并发槽位。""" + await self._release_launch_gate(generation_type="video", token_id=token_id) async def _make_image_generation_request( self, @@ -2445,53 +2659,53 @@ class FlowClient: try: # Do not use curl_cffi impersonation for captcha API JSON endpoints: some ASGI # servers (for example FastAPI/Uvicorn) may receive an empty body and return 422. - async with AsyncSession() as session: - create_url = f"{base_url}/createTask" - create_data = { - "clientKey": client_key, - "task": { - "websiteURL": website_url, - "websiteKey": website_key, - "type": task_type, - "pageAction": page_action - } + session = await self._get_http_session() + create_url = f"{base_url}/createTask" + create_data = { + "clientKey": client_key, + "task": { + "websiteURL": website_url, + "websiteKey": website_key, + "type": task_type, + "pageAction": page_action } + } - result = await session.post(create_url, json=create_data) - result_json = result.json() - task_id = result_json.get('taskId') + result = await session.post(create_url, json=create_data) + result_json = result.json() + task_id = result_json.get('taskId') - debug_logger.log_info(f"[reCAPTCHA {method}] created task_id: {task_id}") + debug_logger.log_info(f"[reCAPTCHA {method}] created task_id: {task_id}") - if not task_id: - error_desc = result_json.get('errorDescription', 'Unknown error') - debug_logger.log_error(f"[reCAPTCHA {method}] Failed to create task: {error_desc}") - return None - - get_url = f"{base_url}/getTaskResult" - for i in range(40): - get_data = { - "clientKey": client_key, - "taskId": task_id - } - result = await session.post(get_url, json=get_data) - result_json = result.json() - - debug_logger.log_info(f"[reCAPTCHA {method}] polling #{i+1}: {result_json}") - - status = result_json.get('status') - if status == 'ready': - solution = result_json.get('solution', {}) - response = solution.get('gRecaptchaResponse') - if response: - debug_logger.log_info(f"[reCAPTCHA {method}] Token获取成功") - return response - - await asyncio.sleep(3) - - debug_logger.log_error(f"[reCAPTCHA {method}] Timeout waiting for token") + if not task_id: + error_desc = result_json.get('errorDescription', 'Unknown error') + debug_logger.log_error(f"[reCAPTCHA {method}] Failed to create task: {error_desc}") return None + get_url = f"{base_url}/getTaskResult" + for i in range(40): + get_data = { + "clientKey": client_key, + "taskId": task_id + } + result = await session.post(get_url, json=get_data) + result_json = result.json() + + debug_logger.log_info(f"[reCAPTCHA {method}] polling #{i+1}: {result_json}") + + status = result_json.get('status') + if status == 'ready': + solution = result_json.get('solution', {}) + response = solution.get('gRecaptchaResponse') + if response: + debug_logger.log_info(f"[reCAPTCHA {method}] Token获取成功") + return response + + await asyncio.sleep(3) + + debug_logger.log_error(f"[reCAPTCHA {method}] Timeout waiting for token") + return None + except Exception as e: debug_logger.log_error(f"[reCAPTCHA {method}] error: {str(e)}") return None diff --git a/tests/testflow_client_launch_gate.py b/tests/testflow_client_launch_gate.py new file mode 100644 index 0000000..12c7ede --- /dev/null +++ b/tests/testflow_client_launch_gate.py @@ -0,0 +1,68 @@ +import asyncio +import unittest + +from src.core.config import config +from src.services.flow_client import FlowClient + + +class FlowClientLaunchGateTests(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.client = FlowClient(proxy_manager=None) + flow_config = config.get_raw_config().setdefault("flow", {}) + self._original_values = { + "image_launch_soft_limit": flow_config.get("image_launch_soft_limit"), + "image_launch_wait_timeout": flow_config.get("image_launch_wait_timeout"), + "image_launch_stagger_ms": flow_config.get("image_launch_stagger_ms"), + } + flow_config["image_launch_soft_limit"] = 1 + flow_config["image_launch_wait_timeout"] = 5 + flow_config["image_launch_stagger_ms"] = 0 + + async def asyncTearDown(self): + flow_config = config.get_raw_config().setdefault("flow", {}) + for key, value in self._original_values.items(): + if value is None: + flow_config.pop(key, None) + else: + flow_config[key] = value + await self.client.close() + + async def test_http_session_reused_within_same_loop(self): + session_a = await self.client._get_http_session() + session_b = await self.client._get_http_session() + + self.assertIs(session_a, session_b) + + async def test_image_launch_gate_blocks_until_release(self): + ok_first, wait_first_ms, stagger_first_ms = await self.client._acquire_image_launch_gate( + token_id=101, + token_image_concurrency=8, + ) + + self.assertTrue(ok_first) + self.assertGreaterEqual(wait_first_ms, 0) + self.assertEqual(stagger_first_ms, 0) + + second_task = asyncio.create_task( + self.client._acquire_image_launch_gate( + token_id=202, + token_image_concurrency=8, + ) + ) + + await asyncio.sleep(0.05) + self.assertFalse(second_task.done()) + + await self.client._release_image_launch_gate(101) + + ok_second, wait_second_ms, stagger_second_ms = await asyncio.wait_for(second_task, timeout=1.0) + + self.assertTrue(ok_second) + self.assertGreaterEqual(wait_second_ms, 40) + self.assertEqual(stagger_second_ms, 0) + + await self.client._release_image_launch_gate(202) + + +if __name__ == "__main__": + unittest.main()