Remove local launch throttling

This commit is contained in:
genz27
2026-03-08 03:15:09 +08:00
parent 58b388df30
commit d5b0688db5
2 changed files with 18 additions and 196 deletions

View File

@@ -46,14 +46,8 @@ class FlowClient:
"x-browser-year": "2026",
"x-client-data": "CJS2yQEIpLbJAQipncoBCNj9ygEIlKHLAQiFoM0BGP6lzwE="
}
# 图片打码前置整形:限制“取验证码”阶段的并发波峰,不影响上游生成并发。
self._image_launch_gate_lock = asyncio.Lock()
self._image_launch_gate_inflight: Dict[int, int] = {}
self._image_next_launch_at: Dict[int, float] = {}
# 视频打码前置整形:限制“取验证码”阶段的并发波峰,不影响上游生成并发。
self._video_launch_gate_lock = asyncio.Lock()
self._video_launch_gate_inflight: Dict[int, int] = {}
self._video_next_launch_at: Dict[int, float] = {}
# 发车策略改为“请求到就发”:
# 不在 flow2api 本地对提交做批次整形或排队,避免把同批请求打成阶梯。
def _generate_user_agent(self, account_id: str = None) -> str:
"""基于账号ID生成固定的 User-Agent
@@ -327,162 +321,29 @@ class FlowClient:
"operation timed out",
])
def _resolve_image_launch_soft_limit(self, token_image_concurrency: Optional[int]) -> Optional[int]:
"""解析图片取验证码阶段软并发上限。None 表示不限制。"""
configured = config.flow_image_launch_soft_limit
if configured <= 0:
return None
if token_image_concurrency and token_image_concurrency > 0:
return max(1, min(configured, int(token_image_concurrency)))
return max(1, configured)
async def _acquire_image_launch_gate(
self,
token_id: Optional[int],
token_image_concurrency: Optional[int],
) -> tuple[bool, int, int]:
"""
控制图片取验证码阶段的并发波峰,减少同批请求互相挤压。
Returns:
(ok, queue_wait_ms, stagger_wait_ms)
"""
if token_id is None:
return True, 0, 0
wait_started = time.monotonic()
wait_timeout = config.flow_image_launch_wait_timeout
deadline = wait_started + wait_timeout
launch_limit = self._resolve_image_launch_soft_limit(token_image_concurrency)
stagger_seconds = max(0, config.flow_image_launch_stagger_ms) / 1000.0
if str(getattr(config, "captcha_method", "")).strip().lower() in {"browser", "remote_browser"}:
# For headed browser token acquisition, avoid artificial staggering so the same batch can start closer together.
stagger_seconds = 0.0
while True:
now = time.monotonic()
stagger_wait = 0.0
async with self._image_launch_gate_lock:
inflight = self._image_launch_gate_inflight.get(token_id, 0)
if launch_limit is None or inflight < launch_limit:
self._image_launch_gate_inflight[token_id] = inflight + 1
if stagger_seconds > 0:
next_allowed = self._image_next_launch_at.get(token_id, now)
if next_allowed > now:
stagger_wait = next_allowed - now
self._image_next_launch_at[token_id] = max(now, next_allowed) + stagger_seconds
queue_wait_ms = int((now - wait_started) * 1000)
if stagger_wait <= 0:
return True, queue_wait_ms, 0
break
if now >= deadline:
queue_wait_ms = int((now - wait_started) * 1000)
return False, queue_wait_ms, 0
await asyncio.sleep(0.05)
await asyncio.sleep(stagger_wait)
stagger_wait_ms = int(stagger_wait * 1000)
queue_wait_ms = int((time.monotonic() - wait_started) * 1000) - stagger_wait_ms
if queue_wait_ms < 0:
queue_wait_ms = 0
return True, queue_wait_ms, stagger_wait_ms
"""图片请求不再做本地发车排队,直接进入取 token 并提交上游。"""
return True, 0, 0
async def _release_image_launch_gate(self, token_id: Optional[int]):
"""释放图片取验证码阶段软并发占位"""
if token_id is None:
return
async with self._image_launch_gate_lock:
inflight = self._image_launch_gate_inflight.get(token_id, 0)
if inflight <= 0:
self._image_launch_gate_inflight[token_id] = 0
return
self._image_launch_gate_inflight[token_id] = inflight - 1
def _resolve_video_launch_soft_limit(self, token_video_concurrency: Optional[int]) -> Optional[int]:
"""解析视频取验证码阶段软并发上限。None 表示不限制。"""
configured = config.flow_video_launch_soft_limit
if configured <= 0:
return None
if token_video_concurrency and token_video_concurrency > 0:
return max(1, min(configured, int(token_video_concurrency)))
return max(1, configured)
"""保留接口形状,当前无需释放任何本地发车状态"""
return
async def _acquire_video_launch_gate(
self,
token_id: Optional[int],
token_video_concurrency: Optional[int],
) -> tuple[bool, int, int]:
"""
控制视频取验证码阶段的并发波峰,减少同批请求互相挤压。
Returns:
(ok, queue_wait_ms, stagger_wait_ms)
"""
if token_id is None:
return True, 0, 0
wait_started = time.monotonic()
wait_timeout = config.flow_video_launch_wait_timeout
deadline = wait_started + wait_timeout
launch_limit = self._resolve_video_launch_soft_limit(token_video_concurrency)
stagger_seconds = max(0, config.flow_video_launch_stagger_ms) / 1000.0
if str(getattr(config, "captcha_method", "")).strip().lower() in {"browser", "remote_browser"}:
stagger_seconds = 0.0
while True:
now = time.monotonic()
stagger_wait = 0.0
async with self._video_launch_gate_lock:
inflight = self._video_launch_gate_inflight.get(token_id, 0)
if launch_limit is None or inflight < launch_limit:
self._video_launch_gate_inflight[token_id] = inflight + 1
if stagger_seconds > 0:
next_allowed = self._video_next_launch_at.get(token_id, now)
if next_allowed > now:
stagger_wait = next_allowed - now
self._video_next_launch_at[token_id] = max(now, next_allowed) + stagger_seconds
queue_wait_ms = int((now - wait_started) * 1000)
if stagger_wait <= 0:
return True, queue_wait_ms, 0
break
if now >= deadline:
queue_wait_ms = int((now - wait_started) * 1000)
return False, queue_wait_ms, 0
await asyncio.sleep(0.05)
await asyncio.sleep(stagger_wait)
stagger_wait_ms = int(stagger_wait * 1000)
queue_wait_ms = int((time.monotonic() - wait_started) * 1000) - stagger_wait_ms
if queue_wait_ms < 0:
queue_wait_ms = 0
return True, queue_wait_ms, stagger_wait_ms
"""视频请求不再做本地发车排队,直接进入取 token 并提交上游。"""
return True, 0, 0
async def _release_video_launch_gate(self, token_id: Optional[int]):
"""释放视频取验证码阶段软并发占位"""
if token_id is None:
return
async with self._video_launch_gate_lock:
inflight = self._video_launch_gate_inflight.get(token_id, 0)
if inflight <= 0:
self._video_launch_gate_inflight[token_id] = 0
return
self._video_launch_gate_inflight[token_id] = inflight - 1
"""保留接口形状,当前无需释放任何本地发车状态"""
return
async def _make_image_generation_request(
self,

View File

@@ -714,7 +714,6 @@ class GenerationHandler:
start_time = time.time()
token = None
generation_type = None
token_slot_reserved = False
pending_token_state = {"active": False}
request_id = f"gen-{int(start_time * 1000)}-{id(asyncio.current_task())}"
perf_trace: Dict[str, Any] = {
@@ -797,7 +796,6 @@ class GenerationHandler:
yield self._create_error_response(error_msg)
return
token_slot_reserved = False
debug_logger.log_info(f"[GENERATION] 已选择Token: {token.id} ({token.email})")
try:
@@ -829,22 +827,16 @@ class GenerationHandler:
generation_pipeline_started_at = time.time()
if generation_type == "image":
debug_logger.log_info(f"[GENERATION] 开始图片生成流程...")
slot_reserved_for_handler = token_slot_reserved
token_slot_reserved = False
async for chunk in self._handle_image_generation(
token, project_id, model_config, prompt, images, stream,
slot_reserved=slot_reserved_for_handler,
perf_trace=perf_trace,
pending_token_state=pending_token_state
):
yield chunk
else: # video
debug_logger.log_info(f"[GENERATION] 开始视频生成流程...")
slot_reserved_for_handler = token_slot_reserved
token_slot_reserved = False
async for chunk in self._handle_video_generation(
token, project_id, model_config, prompt, images, stream,
slot_reserved=slot_reserved_for_handler,
perf_trace=perf_trace,
pending_token_state=pending_token_state
):
@@ -940,11 +932,6 @@ class GenerationHandler:
)
pending_token_state["active"] = False
if token_slot_reserved and token and self.concurrency_manager:
if generation_type == "image":
await self.concurrency_manager.release_image(token.id)
elif generation_type == "video":
await self.concurrency_manager.release_video(token.id)
def _get_no_token_error_message(self, generation_type: str) -> str:
"""获取无可用Token时的详细错误信息"""
@@ -961,30 +948,19 @@ class GenerationHandler:
prompt: str,
images: Optional[List[bytes]],
stream: bool,
slot_reserved: bool = False,
perf_trace: Optional[Dict[str, Any]] = None,
pending_token_state: Optional[Dict[str, bool]] = None
) -> AsyncGenerator:
"""处理图片生成 (同步返回)"""
slot_acquired = False
image_trace: Optional[Dict[str, Any]] = None
if isinstance(perf_trace, dict):
image_trace = perf_trace.setdefault("image_generation", {})
image_trace["input_image_count"] = len(images) if images else 0
# 获取并发槽位
if self.concurrency_manager and not slot_reserved:
slot_ok, slot_wait_ms = await self.concurrency_manager.wait_acquire_image(
token.id,
timeout_seconds=config.flow_image_slot_wait_timeout
)
if image_trace is not None:
image_trace["slot_wait_ms"] = slot_wait_ms
if not slot_ok:
yield self._create_error_response("图片并发限制已达上限")
return
slot_acquired = True
# 不在本地等待图片硬并发槽位;请求一到就直接向上游提交。
if image_trace is not None:
image_trace["slot_wait_ms"] = 0
try:
# 上传图片 (如果有)
@@ -1202,9 +1178,7 @@ class GenerationHandler:
)
finally:
# 释放并发槽位
if self.concurrency_manager and (slot_reserved or slot_acquired):
await self.concurrency_manager.release_image(token.id)
pass
async def _handle_video_generation(
self,
@@ -1214,30 +1188,19 @@ class GenerationHandler:
prompt: str,
images: Optional[List[bytes]],
stream: bool,
slot_reserved: bool = False,
perf_trace: Optional[Dict[str, Any]] = None,
pending_token_state: Optional[Dict[str, bool]] = None
) -> AsyncGenerator:
"""处理视频生成 (异步轮询)"""
slot_acquired = False
video_trace: Optional[Dict[str, Any]] = None
if isinstance(perf_trace, dict):
video_trace = perf_trace.setdefault("video_generation", {})
video_trace["input_image_count"] = len(images) if images else 0
# 获取并发槽位
if self.concurrency_manager and not slot_reserved:
slot_ok, slot_wait_ms = await self.concurrency_manager.wait_acquire_video(
token.id,
timeout_seconds=config.flow_video_slot_wait_timeout
)
if video_trace is not None:
video_trace["slot_wait_ms"] = slot_wait_ms
if not slot_ok:
yield self._create_error_response("视频并发限制已达上限")
return
slot_acquired = True
# 不在本地等待视频硬并发槽位;请求一到就直接向上游提交。
if video_trace is not None:
video_trace["slot_wait_ms"] = 0
try:
# 获取模型类型和配置
@@ -1463,9 +1426,7 @@ class GenerationHandler:
yield chunk
finally:
# 释放并发槽位
if self.concurrency_manager and (slot_reserved or slot_acquired):
await self.concurrency_manager.release_video(token.id)
pass
async def _poll_video_result(
self,