fix(browser): 修复内置有头浏览器与并发端口占满

This commit is contained in:
genz27
2026-04-17 15:21:15 +08:00
parent da72d58d5b
commit 36f8cfc54c
9 changed files with 480 additions and 108 deletions

View File

@@ -5,8 +5,18 @@ WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
ALLOW_DOCKER_HEADED_CAPTCHA=true \
DISPLAY=:99 \
PERSONAL_BROWSER_HEADLESS=0 \
PLAYWRIGHT_BROWSERS_PATH=0
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
fluxbox \
x11-utils \
xauth \
xvfb \
&& rm -rf /var/lib/apt/lists/*
# 在镜像构建阶段预装 Playwright Chromium供 personal/browser 模式复用
COPY requirements.txt ./

View File

@@ -72,6 +72,7 @@ docker-compose -f docker-compose.warp.yml logs -f
> 适用于你有虚拟化桌面需求、希望在容器里启用有头浏览器打码的场景。
> 该模式默认启动 `Xvfb + Fluxbox` 实现容器内部可视化,并设置 `ALLOW_DOCKER_HEADED_CAPTCHA=true`。
> 仅开放应用端口,不提供任何远程桌面连接端口。
> `personal` 内置浏览器现在默认按有头模式启动;如需临时切回无头,可额外设置环境变量 `PERSONAL_BROWSER_HEADLESS=true`。
```bash
# 启动有头模式(首次建议带 --build

View File

@@ -55,6 +55,7 @@ base_url = "" # 缓存文件访问的基础URL, 留空则使用服务器地址
[captcha]
captcha_method = "browser" # 打码方式: yescaptcha/browser/personal/remote_browser
browser_launch_background = true # 有头浏览器是否默认后台启动;设为 false 可直接看到窗口
browser_recaptcha_settle_seconds = 3.0 # reload/clr 就绪后的额外稳态等待
browser_count = 1 # browser 模式的有头浏览器实例数量
personal_project_pool_size = 4 # personal 模式下单个 Token 默认维护的项目池数量(仅影响项目轮换,不决定打码标签页数量)

View File

@@ -2,6 +2,7 @@ version: '3.8'
services:
flow2api-headed:
init: true
build:
context: .
dockerfile: Dockerfile.headed
@@ -16,5 +17,7 @@ services:
environment:
- PYTHONUNBUFFERED=1
- ALLOW_DOCKER_HEADED_CAPTCHA=true
- DISPLAY=:99
- XVFB_SCREEN=1440x900x24
shm_size: "2gb"
restart: unless-stopped

View File

@@ -1,6 +1,10 @@
#!/bin/sh
set -eu
DISPLAY_VALUE="${DISPLAY:-:99}"
XVFB_SCREEN_VALUE="${XVFB_SCREEN:-1440x900x24}"
export DISPLAY="${DISPLAY_VALUE}"
resolve_browser_path() {
python - <<'PY'
from playwright.sync_api import sync_playwright
@@ -17,7 +21,32 @@ if [ -z "${BROWSER_EXECUTABLE_PATH:-}" ] || [ ! -x "${BROWSER_EXECUTABLE_PATH:-}
fi
fi
echo "[entrypoint] starting flow2api (headless browser mode)"
if [ "${ALLOW_DOCKER_HEADED_CAPTCHA:-true}" = "true" ] || [ "${ALLOW_DOCKER_HEADED_CAPTCHA:-1}" = "1" ]; then
display_suffix="$(printf '%s' "${DISPLAY}" | sed 's/^://; s/\..*$//')"
socket_path="/tmp/.X11-unix/X${display_suffix}"
mkdir -p /tmp/.X11-unix
rm -f "/tmp/.X${display_suffix}-lock"
echo "[entrypoint] starting Xvfb on DISPLAY=${DISPLAY} (${XVFB_SCREEN_VALUE})"
Xvfb "${DISPLAY}" -screen 0 "${XVFB_SCREEN_VALUE}" -ac +extension RANDR >/tmp/xvfb.log 2>&1 &
waited=0
while [ ! -S "${socket_path}" ] && [ "${waited}" -lt 100 ]; do
sleep 0.1
waited=$((waited + 1))
done
if [ ! -S "${socket_path}" ]; then
echo "[entrypoint] failed to start Xvfb, socket not ready: ${socket_path}" >&2
exit 1
fi
echo "[entrypoint] starting Fluxbox on DISPLAY=${DISPLAY}"
fluxbox >/tmp/fluxbox.log 2>&1 &
fi
echo "[entrypoint] starting flow2api (headed browser mode)"
if [ -n "${BROWSER_EXECUTABLE_PATH:-}" ] && [ -x "${BROWSER_EXECUTABLE_PATH}" ]; then
echo "[entrypoint] browser executable: ${BROWSER_EXECUTABLE_PATH}"
"${BROWSER_EXECUTABLE_PATH}" --version || true

View File

@@ -144,6 +144,12 @@ async def lifespan(app: FastAPI):
await auto_unban_task_handle
except asyncio.CancelledError:
pass
# Close shared HTTP session pools
try:
await flow_client.close()
print("✓ Flow client HTTP session closed")
except Exception as e:
print(f"⚠ Flow client close failed: {e}")
# Close browser if initialized
if browser_service:
await browser_service.close()

View File

@@ -52,6 +52,20 @@ def _is_truthy_env(name: str) -> bool:
return value.strip().lower() in {"1", "true", "yes", "on"}
def _get_optional_bool_env(name: str) -> Optional[bool]:
"""读取可选布尔环境变量,未设置或无法识别时返回 None。"""
value = os.environ.get(name)
if value is None:
return None
normalized = value.strip().lower()
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
return None
ALLOW_DOCKER_HEADED = (
_is_truthy_env("ALLOW_DOCKER_HEADED_CAPTCHA")
or _is_truthy_env("ALLOW_DOCKER_BROWSER_CAPTCHA")
@@ -555,7 +569,7 @@ class BrowserCaptchaService:
def __init__(self, db=None):
"""初始化服务"""
self.headless = True # 无头模式
self.headless = self._resolve_headless_mode() # 默认改为有头,可用环境变量回退到无头
self.browser = None
self._initialized = False
self.website_key = "6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV"
@@ -640,6 +654,18 @@ class BrowserCaptchaService:
f"fingerprint_ttl {old_fingerprint_ttl}s->{self._fingerprint_cache_ttl_seconds}s"
)
def _resolve_headless_mode(self) -> bool:
"""personal 模式默认改为有头,仅在显式环境变量要求时回退到无头。"""
for env_name in ("PERSONAL_BROWSER_HEADLESS", "FLOW2API_PERSONAL_HEADLESS"):
override = _get_optional_bool_env(env_name)
if override is not None:
debug_logger.log_info(
f"[BrowserCaptcha] Personal headless 模式由环境变量 {env_name} 控制: {override}"
)
return override
return False
def _refresh_runtime_tunables(self):
"""刷新运行时调优参数,缺省时使用保守的低开销默认值。"""
try:
@@ -1454,6 +1480,7 @@ class BrowserCaptchaService:
self._proxy_url = f"{protocol}://{host}:{port}"
debug_logger.log_info(f"[BrowserCaptcha] Personal 浏览器代理: {self._proxy_url}")
launch_in_background = bool(getattr(config, "browser_launch_background", True))
browser_args = [
'--disable-quic',
'--disable-features=UseDnsHttpsSvcb',
@@ -1463,7 +1490,6 @@ class BrowserCaptchaService:
'--disable-infobars',
'--hide-scrollbars',
'--window-size=1280,720',
'--window-position=3000,3000',
'--profile-directory=Default',
'--disable-background-networking',
'--disable-sync',
@@ -1473,6 +1499,20 @@ class BrowserCaptchaService:
'--no-default-browser-check',
'--no-zygote',
]
if launch_in_background and not self.headless:
browser_args.extend([
'--start-minimized',
'--disable-background-timer-throttling',
'--disable-renderer-backgrounding',
'--disable-backgrounding-occluded-windows',
])
if sys.platform.startswith("win"):
browser_args.append('--window-position=-32000,-32000')
else:
browser_args.append('--window-position=3000,3000')
debug_logger.log_info("[BrowserCaptcha] Personal 有头浏览器将以后台模式启动")
elif not self.headless:
debug_logger.log_info("[BrowserCaptcha] Personal 有头浏览器将以可见窗口模式启动")
if proxy_server_arg:
browser_args.append(proxy_server_arg)
if self._proxy_ext_dir:
@@ -1503,7 +1543,7 @@ class BrowserCaptchaService:
debug_logger.log_info(
"[BrowserCaptcha] nodriver 启动上下文: "
f"docker={IS_DOCKER}, display={display_value or '<empty>'}, "
f"uid={effective_uid}, headless={self.headless}, sandbox=False, "
f"uid={effective_uid}, headless={self.headless}, background={launch_in_background}, sandbox=False, "
f"executable={browser_executable_path or '<auto>'}, "
f"args={' '.join(effective_launch_args)}"
)

View File

@@ -38,6 +38,17 @@ class FlowClient:
default=None
)
self._remote_browser_prefill_last_sent: Dict[str, float] = {}
self._http_session: Optional[AsyncSession] = None
self._http_session_loop: Optional[asyncio.AbstractEventLoop] = None
self._http_session_lock = asyncio.Lock()
self._image_launch_condition = asyncio.Condition()
self._video_launch_condition = asyncio.Condition()
self._image_launch_inflight = 0
self._video_launch_inflight = 0
self._image_launch_stagger_lock = asyncio.Lock()
self._video_launch_stagger_lock = asyncio.Lock()
self._image_launch_next_at = 0.0
self._video_launch_next_at = 0.0
# Default "real browser" headers (Android Chrome style) to reduce upstream 4xx/5xx instability.
# These will be applied as defaults (won't override caller-provided headers).
@@ -252,65 +263,65 @@ class FlowClient:
start_time = time.time()
try:
async with AsyncSession() as session:
if method.upper() == "GET":
response = await session.get(
url,
headers=headers,
proxy=proxy_url,
timeout=request_timeout,
impersonate="chrome110"
)
else: # POST
response = await session.post(
url,
headers=headers,
json=json_data,
proxy=proxy_url,
timeout=request_timeout,
impersonate="chrome110"
)
session = await self._get_http_session()
if method.upper() == "GET":
response = await session.get(
url,
headers=headers,
proxy=proxy_url,
timeout=request_timeout,
impersonate="chrome110"
)
else: # POST
response = await session.post(
url,
headers=headers,
json=json_data,
proxy=proxy_url,
timeout=request_timeout,
impersonate="chrome110"
)
duration_ms = (time.time() - start_time) * 1000
duration_ms = (time.time() - start_time) * 1000
# Log response
if config.debug_enabled:
debug_logger.log_response(
status_code=response.status_code,
headers=dict(response.headers),
body=response.text,
duration_ms=duration_ms
)
# Log response
if config.debug_enabled:
debug_logger.log_response(
status_code=response.status_code,
headers=dict(response.headers),
body=response.text,
duration_ms=duration_ms
)
# 检查HTTP错误
if response.status_code >= 400:
# 解析错误响应
error_reason = f"HTTP Error {response.status_code}"
try:
error_body = response.json()
# 提取 Google API 错误格式中的 reason
if "error" in error_body:
error_info = error_body["error"]
error_message = error_info.get("message", "")
# 从 details 中提取 reason
details = error_info.get("details", [])
for detail in details:
if detail.get("reason"):
error_reason = detail.get("reason")
break
if error_message:
error_reason = f"{error_reason}: {error_message}"
except:
error_reason = f"HTTP Error {response.status_code}: {response.text[:200]}"
# 失败时输出请求体和错误内容到控制台
debug_logger.log_error(f"[API FAILED] URL: {url}")
debug_logger.log_error(f"[API FAILED] Request Body: {json_data}")
debug_logger.log_error(f"[API FAILED] Response: {response.text}")
raise Exception(error_reason)
# 检查HTTP错误
if response.status_code >= 400:
# 解析错误响应
error_reason = f"HTTP Error {response.status_code}"
try:
error_body = response.json()
# 提取 Google API 错误格式中的 reason
if "error" in error_body:
error_info = error_body["error"]
error_message = error_info.get("message", "")
# 从 details 中提取 reason
details = error_info.get("details", [])
for detail in details:
if detail.get("reason"):
error_reason = detail.get("reason")
break
if error_message:
error_reason = f"{error_reason}: {error_message}"
except Exception:
error_reason = f"HTTP Error {response.status_code}: {response.text[:200]}"
return response.json()
# 失败时输出请求体和错误内容到控制台
debug_logger.log_error(f"[API FAILED] URL: {url}")
debug_logger.log_error(f"[API FAILED] Request Body: {json_data}")
debug_logger.log_error(f"[API FAILED] Response: {response.text}")
raise Exception(error_reason)
return response.json()
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
@@ -462,29 +473,232 @@ class FlowClient:
"""控制轻量控制面请求的超时,避免认证/项目接口长时间挂起。"""
return max(5, min(int(self.timeout or 0) or 120, 10))
def _get_http_pool_size(self) -> int:
"""根据软并发配置决定复用连接池大小,避免瞬时创建过多 socket。"""
configured_limits: list[int] = []
for raw_value in (
getattr(config, "flow_image_launch_soft_limit", 0),
getattr(config, "flow_video_launch_soft_limit", 0),
):
try:
configured_limits.append(max(0, int(raw_value or 0)))
except Exception:
configured_limits.append(0)
peak_limit = max(configured_limits + [10])
return max(10, min(64, peak_limit))
async def _get_http_session(self) -> AsyncSession:
"""复用上游 HTTP 会话,减少 Docker 内高并发时的临时端口占用。"""
current_loop = asyncio.get_running_loop()
if self._http_session is not None and self._http_session_loop is current_loop:
return self._http_session
async with self._http_session_lock:
if self._http_session is not None and self._http_session_loop is not current_loop:
try:
await self._http_session.close()
except Exception:
pass
self._http_session = None
self._http_session_loop = None
if self._http_session is None:
max_clients = self._get_http_pool_size()
self._http_session = AsyncSession(max_clients=max_clients)
self._http_session_loop = current_loop
debug_logger.log_info(
f"[HTTP] 已创建复用 AsyncSession (max_clients={max_clients})"
)
return self._http_session
async def close(self):
"""关闭复用 HTTP 会话。"""
async with self._http_session_lock:
session = self._http_session
self._http_session = None
self._http_session_loop = None
if session is not None:
try:
await session.close()
except Exception as e:
debug_logger.log_warning(f"[HTTP] 关闭复用 AsyncSession 失败: {e}")
async def _apply_launch_stagger(self, generation_type: str, stagger_ms: int) -> int:
"""进入打码前做轻量错峰,减少同一时刻批量占满端口。"""
try:
normalized_stagger_ms = max(0, int(stagger_ms or 0))
except Exception:
normalized_stagger_ms = 0
if normalized_stagger_ms <= 0:
return 0
interval_seconds = normalized_stagger_ms / 1000.0
lock = (
self._image_launch_stagger_lock
if generation_type == "image"
else self._video_launch_stagger_lock
)
next_attr = (
"_image_launch_next_at"
if generation_type == "image"
else "_video_launch_next_at"
)
async with lock:
now = time.monotonic()
next_available_at = float(getattr(self, next_attr, 0.0) or 0.0)
scheduled_at = max(now, next_available_at)
setattr(self, next_attr, scheduled_at + interval_seconds)
wait_seconds = max(0.0, scheduled_at - time.monotonic())
if wait_seconds > 0:
await asyncio.sleep(wait_seconds)
return int(wait_seconds * 1000)
async def _acquire_launch_gate(
self,
*,
generation_type: str,
token_id: Optional[int],
soft_limit: int,
wait_timeout_seconds: float,
stagger_ms: int,
) -> tuple[bool, int, int]:
"""控制进入打码阶段的软并发,降低 Docker 内短时端口/线程压力。"""
try:
normalized_limit = max(0, int(soft_limit or 0))
except Exception:
normalized_limit = 0
if normalized_limit <= 0:
return True, 0, 0
wait_started_at = time.monotonic()
deadline = wait_started_at + max(1.0, float(wait_timeout_seconds or 0))
condition = (
self._image_launch_condition
if generation_type == "image"
else self._video_launch_condition
)
inflight_attr = (
"_image_launch_inflight"
if generation_type == "image"
else "_video_launch_inflight"
)
current_inflight = 0
while True:
async with condition:
current_inflight = int(getattr(self, inflight_attr, 0) or 0)
if current_inflight < normalized_limit:
current_inflight += 1
setattr(self, inflight_attr, current_inflight)
break
remaining_seconds = deadline - time.monotonic()
if remaining_seconds <= 0:
waited_ms = int((time.monotonic() - wait_started_at) * 1000)
debug_logger.log_warning(
f"[LAUNCH_GATE] {generation_type} 等待超时 "
f"(token={token_id}, inflight={current_inflight}/{normalized_limit}, waited={waited_ms}ms)"
)
return False, waited_ms, 0
try:
await asyncio.wait_for(condition.wait(), timeout=remaining_seconds)
except asyncio.TimeoutError:
waited_ms = int((time.monotonic() - wait_started_at) * 1000)
debug_logger.log_warning(
f"[LAUNCH_GATE] {generation_type} 等待超时 "
f"(token={token_id}, inflight={current_inflight}/{normalized_limit}, waited={waited_ms}ms)"
)
return False, waited_ms, 0
waited_ms = int((time.monotonic() - wait_started_at) * 1000)
try:
stagger_wait_ms = await self._apply_launch_stagger(
generation_type=generation_type,
stagger_ms=stagger_ms,
)
except Exception:
await self._release_launch_gate(generation_type=generation_type, token_id=token_id)
raise
debug_logger.log_info(
f"[LAUNCH_GATE] {generation_type} 已放行 "
f"(token={token_id}, inflight={current_inflight}/{normalized_limit}, "
f"queue_wait={waited_ms}ms, stagger_wait={stagger_wait_ms}ms)"
)
return True, waited_ms, stagger_wait_ms
async def _release_launch_gate(self, *, generation_type: str, token_id: Optional[int]):
condition = (
self._image_launch_condition
if generation_type == "image"
else self._video_launch_condition
)
inflight_attr = (
"_image_launch_inflight"
if generation_type == "image"
else "_video_launch_inflight"
)
async with condition:
current_inflight = int(getattr(self, inflight_attr, 0) or 0)
if current_inflight <= 0:
setattr(self, inflight_attr, 0)
debug_logger.log_warning(
f"[LAUNCH_GATE] {generation_type} release 时 inflight 已为 0 (token={token_id})"
)
return
new_inflight = current_inflight - 1
setattr(self, inflight_attr, new_inflight)
condition.notify(1)
debug_logger.log_info(
f"[LAUNCH_GATE] {generation_type} 已释放 (token={token_id}, inflight={new_inflight})"
)
async def _acquire_image_launch_gate(
self,
token_id: Optional[int],
token_image_concurrency: Optional[int],
) -> tuple[bool, int, int]:
"""图片请求不再做本地发车排队,直接进入取 token 并提交上游"""
return True, 0, 0
"""图片请求进入打码前的软并发整形"""
return await self._acquire_launch_gate(
generation_type="image",
token_id=token_id,
soft_limit=getattr(config, "flow_image_launch_soft_limit", 0),
wait_timeout_seconds=getattr(config, "flow_image_launch_wait_timeout", 180),
stagger_ms=getattr(config, "flow_image_launch_stagger_ms", 0),
)
async def _release_image_launch_gate(self, token_id: Optional[int]):
"""保留接口形状,当前无需释放任何本地发车状态"""
return
"""释放图片发车软并发槽位"""
await self._release_launch_gate(generation_type="image", token_id=token_id)
async def _acquire_video_launch_gate(
self,
token_id: Optional[int],
token_video_concurrency: Optional[int],
) -> tuple[bool, int, int]:
"""视频请求不再做本地发车排队,直接进入取 token 并提交上游"""
return True, 0, 0
"""视频请求进入打码前的软并发整形"""
return await self._acquire_launch_gate(
generation_type="video",
token_id=token_id,
soft_limit=getattr(config, "flow_video_launch_soft_limit", 0),
wait_timeout_seconds=getattr(config, "flow_video_launch_wait_timeout", 180),
stagger_ms=getattr(config, "flow_video_launch_stagger_ms", 0),
)
async def _release_video_launch_gate(self, token_id: Optional[int]):
"""保留接口形状,当前无需释放任何本地发车状态"""
return
"""释放视频发车软并发槽位"""
await self._release_launch_gate(generation_type="video", token_id=token_id)
async def _make_image_generation_request(
self,
@@ -2445,53 +2659,53 @@ class FlowClient:
try:
# Do not use curl_cffi impersonation for captcha API JSON endpoints: some ASGI
# servers (for example FastAPI/Uvicorn) may receive an empty body and return 422.
async with AsyncSession() as session:
create_url = f"{base_url}/createTask"
create_data = {
"clientKey": client_key,
"task": {
"websiteURL": website_url,
"websiteKey": website_key,
"type": task_type,
"pageAction": page_action
}
session = await self._get_http_session()
create_url = f"{base_url}/createTask"
create_data = {
"clientKey": client_key,
"task": {
"websiteURL": website_url,
"websiteKey": website_key,
"type": task_type,
"pageAction": page_action
}
}
result = await session.post(create_url, json=create_data)
result_json = result.json()
task_id = result_json.get('taskId')
result = await session.post(create_url, json=create_data)
result_json = result.json()
task_id = result_json.get('taskId')
debug_logger.log_info(f"[reCAPTCHA {method}] created task_id: {task_id}")
debug_logger.log_info(f"[reCAPTCHA {method}] created task_id: {task_id}")
if not task_id:
error_desc = result_json.get('errorDescription', 'Unknown error')
debug_logger.log_error(f"[reCAPTCHA {method}] Failed to create task: {error_desc}")
return None
get_url = f"{base_url}/getTaskResult"
for i in range(40):
get_data = {
"clientKey": client_key,
"taskId": task_id
}
result = await session.post(get_url, json=get_data)
result_json = result.json()
debug_logger.log_info(f"[reCAPTCHA {method}] polling #{i+1}: {result_json}")
status = result_json.get('status')
if status == 'ready':
solution = result_json.get('solution', {})
response = solution.get('gRecaptchaResponse')
if response:
debug_logger.log_info(f"[reCAPTCHA {method}] Token获取成功")
return response
await asyncio.sleep(3)
debug_logger.log_error(f"[reCAPTCHA {method}] Timeout waiting for token")
if not task_id:
error_desc = result_json.get('errorDescription', 'Unknown error')
debug_logger.log_error(f"[reCAPTCHA {method}] Failed to create task: {error_desc}")
return None
get_url = f"{base_url}/getTaskResult"
for i in range(40):
get_data = {
"clientKey": client_key,
"taskId": task_id
}
result = await session.post(get_url, json=get_data)
result_json = result.json()
debug_logger.log_info(f"[reCAPTCHA {method}] polling #{i+1}: {result_json}")
status = result_json.get('status')
if status == 'ready':
solution = result_json.get('solution', {})
response = solution.get('gRecaptchaResponse')
if response:
debug_logger.log_info(f"[reCAPTCHA {method}] Token获取成功")
return response
await asyncio.sleep(3)
debug_logger.log_error(f"[reCAPTCHA {method}] Timeout waiting for token")
return None
except Exception as e:
debug_logger.log_error(f"[reCAPTCHA {method}] error: {str(e)}")
return None

View File

@@ -0,0 +1,68 @@
import asyncio
import unittest
from src.core.config import config
from src.services.flow_client import FlowClient
class FlowClientLaunchGateTests(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self.client = FlowClient(proxy_manager=None)
flow_config = config.get_raw_config().setdefault("flow", {})
self._original_values = {
"image_launch_soft_limit": flow_config.get("image_launch_soft_limit"),
"image_launch_wait_timeout": flow_config.get("image_launch_wait_timeout"),
"image_launch_stagger_ms": flow_config.get("image_launch_stagger_ms"),
}
flow_config["image_launch_soft_limit"] = 1
flow_config["image_launch_wait_timeout"] = 5
flow_config["image_launch_stagger_ms"] = 0
async def asyncTearDown(self):
flow_config = config.get_raw_config().setdefault("flow", {})
for key, value in self._original_values.items():
if value is None:
flow_config.pop(key, None)
else:
flow_config[key] = value
await self.client.close()
async def test_http_session_reused_within_same_loop(self):
session_a = await self.client._get_http_session()
session_b = await self.client._get_http_session()
self.assertIs(session_a, session_b)
async def test_image_launch_gate_blocks_until_release(self):
ok_first, wait_first_ms, stagger_first_ms = await self.client._acquire_image_launch_gate(
token_id=101,
token_image_concurrency=8,
)
self.assertTrue(ok_first)
self.assertGreaterEqual(wait_first_ms, 0)
self.assertEqual(stagger_first_ms, 0)
second_task = asyncio.create_task(
self.client._acquire_image_launch_gate(
token_id=202,
token_image_concurrency=8,
)
)
await asyncio.sleep(0.05)
self.assertFalse(second_task.done())
await self.client._release_image_launch_gate(101)
ok_second, wait_second_ms, stagger_second_ms = await asyncio.wait_for(second_task, timeout=1.0)
self.assertTrue(ok_second)
self.assertGreaterEqual(wait_second_ms, 40)
self.assertEqual(stagger_second_ms, 0)
await self.client._release_image_launch_gate(202)
if __name__ == "__main__":
unittest.main()