diff --git a/src/services/flow_client.py b/src/services/flow_client.py index 13e6af4..b66dd8b 100644 --- a/src/services/flow_client.py +++ b/src/services/flow_client.py @@ -46,14 +46,8 @@ class FlowClient: "x-browser-year": "2026", "x-client-data": "CJS2yQEIpLbJAQipncoBCNj9ygEIlKHLAQiFoM0BGP6lzwE=" } - # 图片打码前置整形:限制“取验证码”阶段的并发波峰,不影响上游生成并发。 - self._image_launch_gate_lock = asyncio.Lock() - self._image_launch_gate_inflight: Dict[int, int] = {} - self._image_next_launch_at: Dict[int, float] = {} - # 视频打码前置整形:限制“取验证码”阶段的并发波峰,不影响上游生成并发。 - self._video_launch_gate_lock = asyncio.Lock() - self._video_launch_gate_inflight: Dict[int, int] = {} - self._video_next_launch_at: Dict[int, float] = {} + # 发车策略改为“请求到就发”: + # 不在 flow2api 本地对提交做批次整形或排队,避免把同批请求打成阶梯。 def _generate_user_agent(self, account_id: str = None) -> str: """基于账号ID生成固定的 User-Agent @@ -327,162 +321,29 @@ class FlowClient: "operation timed out", ]) - def _resolve_image_launch_soft_limit(self, token_image_concurrency: Optional[int]) -> Optional[int]: - """解析图片取验证码阶段软并发上限。None 表示不限制。""" - configured = config.flow_image_launch_soft_limit - if configured <= 0: - return None - - if token_image_concurrency and token_image_concurrency > 0: - return max(1, min(configured, int(token_image_concurrency))) - - return max(1, configured) - async def _acquire_image_launch_gate( self, token_id: Optional[int], token_image_concurrency: Optional[int], ) -> tuple[bool, int, int]: - """ - 控制图片取验证码阶段的并发波峰,减少同批请求互相挤压。 - - Returns: - (ok, queue_wait_ms, stagger_wait_ms) - """ - if token_id is None: - return True, 0, 0 - - wait_started = time.monotonic() - wait_timeout = config.flow_image_launch_wait_timeout - deadline = wait_started + wait_timeout - launch_limit = self._resolve_image_launch_soft_limit(token_image_concurrency) - stagger_seconds = max(0, config.flow_image_launch_stagger_ms) / 1000.0 - if str(getattr(config, "captcha_method", "")).strip().lower() in {"browser", "remote_browser"}: - # For headed browser token acquisition, avoid artificial staggering so the same batch can start closer together. - stagger_seconds = 0.0 - - while True: - now = time.monotonic() - stagger_wait = 0.0 - - async with self._image_launch_gate_lock: - inflight = self._image_launch_gate_inflight.get(token_id, 0) - if launch_limit is None or inflight < launch_limit: - self._image_launch_gate_inflight[token_id] = inflight + 1 - - if stagger_seconds > 0: - next_allowed = self._image_next_launch_at.get(token_id, now) - if next_allowed > now: - stagger_wait = next_allowed - now - self._image_next_launch_at[token_id] = max(now, next_allowed) + stagger_seconds - - queue_wait_ms = int((now - wait_started) * 1000) - if stagger_wait <= 0: - return True, queue_wait_ms, 0 - break - - if now >= deadline: - queue_wait_ms = int((now - wait_started) * 1000) - return False, queue_wait_ms, 0 - - await asyncio.sleep(0.05) - - await asyncio.sleep(stagger_wait) - stagger_wait_ms = int(stagger_wait * 1000) - queue_wait_ms = int((time.monotonic() - wait_started) * 1000) - stagger_wait_ms - if queue_wait_ms < 0: - queue_wait_ms = 0 - return True, queue_wait_ms, stagger_wait_ms + """图片请求不再做本地发车排队,直接进入取 token 并提交上游。""" + return True, 0, 0 async def _release_image_launch_gate(self, token_id: Optional[int]): - """释放图片取验证码阶段软并发占位。""" - if token_id is None: - return - - async with self._image_launch_gate_lock: - inflight = self._image_launch_gate_inflight.get(token_id, 0) - if inflight <= 0: - self._image_launch_gate_inflight[token_id] = 0 - return - self._image_launch_gate_inflight[token_id] = inflight - 1 - - def _resolve_video_launch_soft_limit(self, token_video_concurrency: Optional[int]) -> Optional[int]: - """解析视频取验证码阶段软并发上限。None 表示不限制。""" - configured = config.flow_video_launch_soft_limit - if configured <= 0: - return None - - if token_video_concurrency and token_video_concurrency > 0: - return max(1, min(configured, int(token_video_concurrency))) - - return max(1, configured) + """保留接口形状,当前无需释放任何本地发车状态。""" + return async def _acquire_video_launch_gate( self, token_id: Optional[int], token_video_concurrency: Optional[int], ) -> tuple[bool, int, int]: - """ - 控制视频取验证码阶段的并发波峰,减少同批请求互相挤压。 - - Returns: - (ok, queue_wait_ms, stagger_wait_ms) - """ - if token_id is None: - return True, 0, 0 - - wait_started = time.monotonic() - wait_timeout = config.flow_video_launch_wait_timeout - deadline = wait_started + wait_timeout - launch_limit = self._resolve_video_launch_soft_limit(token_video_concurrency) - stagger_seconds = max(0, config.flow_video_launch_stagger_ms) / 1000.0 - if str(getattr(config, "captcha_method", "")).strip().lower() in {"browser", "remote_browser"}: - stagger_seconds = 0.0 - - while True: - now = time.monotonic() - stagger_wait = 0.0 - - async with self._video_launch_gate_lock: - inflight = self._video_launch_gate_inflight.get(token_id, 0) - if launch_limit is None or inflight < launch_limit: - self._video_launch_gate_inflight[token_id] = inflight + 1 - - if stagger_seconds > 0: - next_allowed = self._video_next_launch_at.get(token_id, now) - if next_allowed > now: - stagger_wait = next_allowed - now - self._video_next_launch_at[token_id] = max(now, next_allowed) + stagger_seconds - - queue_wait_ms = int((now - wait_started) * 1000) - if stagger_wait <= 0: - return True, queue_wait_ms, 0 - break - - if now >= deadline: - queue_wait_ms = int((now - wait_started) * 1000) - return False, queue_wait_ms, 0 - - await asyncio.sleep(0.05) - - await asyncio.sleep(stagger_wait) - stagger_wait_ms = int(stagger_wait * 1000) - queue_wait_ms = int((time.monotonic() - wait_started) * 1000) - stagger_wait_ms - if queue_wait_ms < 0: - queue_wait_ms = 0 - return True, queue_wait_ms, stagger_wait_ms + """视频请求不再做本地发车排队,直接进入取 token 并提交上游。""" + return True, 0, 0 async def _release_video_launch_gate(self, token_id: Optional[int]): - """释放视频取验证码阶段软并发占位。""" - if token_id is None: - return - - async with self._video_launch_gate_lock: - inflight = self._video_launch_gate_inflight.get(token_id, 0) - if inflight <= 0: - self._video_launch_gate_inflight[token_id] = 0 - return - self._video_launch_gate_inflight[token_id] = inflight - 1 + """保留接口形状,当前无需释放任何本地发车状态。""" + return async def _make_image_generation_request( self, diff --git a/src/services/generation_handler.py b/src/services/generation_handler.py index 31dd6df..d58b6eb 100644 --- a/src/services/generation_handler.py +++ b/src/services/generation_handler.py @@ -714,7 +714,6 @@ class GenerationHandler: start_time = time.time() token = None generation_type = None - token_slot_reserved = False pending_token_state = {"active": False} request_id = f"gen-{int(start_time * 1000)}-{id(asyncio.current_task())}" perf_trace: Dict[str, Any] = { @@ -797,7 +796,6 @@ class GenerationHandler: yield self._create_error_response(error_msg) return - token_slot_reserved = False debug_logger.log_info(f"[GENERATION] 已选择Token: {token.id} ({token.email})") try: @@ -829,22 +827,16 @@ class GenerationHandler: generation_pipeline_started_at = time.time() if generation_type == "image": debug_logger.log_info(f"[GENERATION] 开始图片生成流程...") - slot_reserved_for_handler = token_slot_reserved - token_slot_reserved = False async for chunk in self._handle_image_generation( token, project_id, model_config, prompt, images, stream, - slot_reserved=slot_reserved_for_handler, perf_trace=perf_trace, pending_token_state=pending_token_state ): yield chunk else: # video debug_logger.log_info(f"[GENERATION] 开始视频生成流程...") - slot_reserved_for_handler = token_slot_reserved - token_slot_reserved = False async for chunk in self._handle_video_generation( token, project_id, model_config, prompt, images, stream, - slot_reserved=slot_reserved_for_handler, perf_trace=perf_trace, pending_token_state=pending_token_state ): @@ -940,11 +932,6 @@ class GenerationHandler: ) pending_token_state["active"] = False - if token_slot_reserved and token and self.concurrency_manager: - if generation_type == "image": - await self.concurrency_manager.release_image(token.id) - elif generation_type == "video": - await self.concurrency_manager.release_video(token.id) def _get_no_token_error_message(self, generation_type: str) -> str: """获取无可用Token时的详细错误信息""" @@ -961,30 +948,19 @@ class GenerationHandler: prompt: str, images: Optional[List[bytes]], stream: bool, - slot_reserved: bool = False, perf_trace: Optional[Dict[str, Any]] = None, pending_token_state: Optional[Dict[str, bool]] = None ) -> AsyncGenerator: """处理图片生成 (同步返回)""" - slot_acquired = False image_trace: Optional[Dict[str, Any]] = None if isinstance(perf_trace, dict): image_trace = perf_trace.setdefault("image_generation", {}) image_trace["input_image_count"] = len(images) if images else 0 - # 获取并发槽位 - if self.concurrency_manager and not slot_reserved: - slot_ok, slot_wait_ms = await self.concurrency_manager.wait_acquire_image( - token.id, - timeout_seconds=config.flow_image_slot_wait_timeout - ) - if image_trace is not None: - image_trace["slot_wait_ms"] = slot_wait_ms - if not slot_ok: - yield self._create_error_response("图片并发限制已达上限") - return - slot_acquired = True + # 不在本地等待图片硬并发槽位;请求一到就直接向上游提交。 + if image_trace is not None: + image_trace["slot_wait_ms"] = 0 try: # 上传图片 (如果有) @@ -1202,9 +1178,7 @@ class GenerationHandler: ) finally: - # 释放并发槽位 - if self.concurrency_manager and (slot_reserved or slot_acquired): - await self.concurrency_manager.release_image(token.id) + pass async def _handle_video_generation( self, @@ -1214,30 +1188,19 @@ class GenerationHandler: prompt: str, images: Optional[List[bytes]], stream: bool, - slot_reserved: bool = False, perf_trace: Optional[Dict[str, Any]] = None, pending_token_state: Optional[Dict[str, bool]] = None ) -> AsyncGenerator: """处理视频生成 (异步轮询)""" - slot_acquired = False video_trace: Optional[Dict[str, Any]] = None if isinstance(perf_trace, dict): video_trace = perf_trace.setdefault("video_generation", {}) video_trace["input_image_count"] = len(images) if images else 0 - # 获取并发槽位 - if self.concurrency_manager and not slot_reserved: - slot_ok, slot_wait_ms = await self.concurrency_manager.wait_acquire_video( - token.id, - timeout_seconds=config.flow_video_slot_wait_timeout - ) - if video_trace is not None: - video_trace["slot_wait_ms"] = slot_wait_ms - if not slot_ok: - yield self._create_error_response("视频并发限制已达上限") - return - slot_acquired = True + # 不在本地等待视频硬并发槽位;请求一到就直接向上游提交。 + if video_trace is not None: + video_trace["slot_wait_ms"] = 0 try: # 获取模型类型和配置 @@ -1463,9 +1426,7 @@ class GenerationHandler: yield chunk finally: - # 释放并发槽位 - if self.concurrency_manager and (slot_reserved or slot_acquired): - await self.concurrency_manager.release_video(token.id) + pass async def _poll_video_result( self,