diff --git a/src/services/flow_client.py b/src/services/flow_client.py index 0431bf3..615d0fd 100644 --- a/src/services/flow_client.py +++ b/src/services/flow_client.py @@ -854,6 +854,7 @@ class FlowClient: new_url = f"{self.api_base_url}/flow/uploadImage" normalized_project_id = str(project_id or "").strip() new_client_context = { + "sessionId": self._generate_session_id(), "tool": "PINHOLE" } if normalized_project_id: @@ -909,7 +910,7 @@ class FlowClient: use_media_proxy=True ) media_id = ( - new_result.get("media", {}).get("name") + self._extract_media_name(new_result.get("media")) or new_result.get("mediaGenerationId", {}).get("mediaGenerationId") ) if media_id: @@ -1235,6 +1236,20 @@ class FlowClient: # ========== 视频生成 (使用AT) - 异步返回 ========== + def _extract_media_name(self, media: Any) -> Optional[str]: + """从新版 media 对象或数组中提取 media id。""" + if isinstance(media, list): + for item in media: + media_name = self._extract_media_name(item) + if media_name: + return media_name + return None + if isinstance(media, dict): + name = media.get("name") + if isinstance(name, str) and name.strip(): + return name.strip() + return None + def _build_video_text_input(self, prompt: str, use_v2_model_config: bool = False) -> Dict[str, Any]: if use_v2_model_config: return { @@ -1248,6 +1263,204 @@ class FlowClient: "prompt": prompt } + def _build_video_media_generation_context(self, batch_id: Optional[str] = None) -> Dict[str, Any]: + return { + "batchId": batch_id or str(uuid.uuid4()), + "audioFailurePreference": "BLOCK_SILENCED_VIDEOS", + } + + def _find_nested_string(self, value: Any, keys: tuple[str, ...]) -> Optional[str]: + if isinstance(value, dict): + for key in keys: + candidate = value.get(key) + if isinstance(candidate, str) and candidate.strip(): + return candidate.strip() + for candidate in value.values(): + found = self._find_nested_string(candidate, keys) + if found: + return found + elif isinstance(value, list): + for item in value: + found = self._find_nested_string(item, keys) + if found: + return found + return None + + def _extract_video_status_from_media(self, media: Dict[str, Any]) -> tuple[Optional[str], Dict[str, Any]]: + status_block = ( + media.get("mediaMetadata", {}).get("mediaStatus", {}) + or media.get("mediaStatus", {}) + or {} + ) + status = ( + status_block.get("mediaGenerationStatus") + or status_block.get("status") + or media.get("status") + ) + return status, status_block if isinstance(status_block, dict) else {} + + def _extract_video_url_from_media(self, media: Dict[str, Any]) -> Optional[str]: + video = media.get("video") if isinstance(media.get("video"), dict) else {} + candidates = [ + self._find_nested_string(video, ("fifeUrl", "videoUrl", "outputUri", "downloadUri")), + self._find_nested_string(media, ("fifeUrl", "videoUrl", "outputUri", "downloadUri")), + self._find_nested_string(video, ("uri", "url")), + ] + for candidate in candidates: + if candidate and (candidate.startswith("http://") or candidate.startswith("https://") or candidate.startswith("/")): + return candidate + return None + + def _media_to_video_operation( + self, + media: Dict[str, Any], + fallback_project_id: Optional[str] = None, + ) -> Optional[Dict[str, Any]]: + if not isinstance(media, dict): + return None + + media_name = self._extract_media_name(media) + video = media.get("video") if isinstance(media.get("video"), dict) else {} + video_operation = video.get("operation") if isinstance(video.get("operation"), dict) else {} + operation_name = ( + video_operation.get("name") + or self._find_nested_string(video_operation, ("name",)) + or media_name + ) + if not operation_name: + return None + + project_id = media.get("projectId") or fallback_project_id + status, status_block = self._extract_video_status_from_media(media) + operation: Dict[str, Any] = { + "operation": { + "name": operation_name, + }, + "status": status or "MEDIA_GENERATION_STATUS_PENDING", + } + if media_name: + operation["mediaName"] = media_name + if project_id: + operation["projectId"] = project_id + + scene_id = ( + media.get("sceneId") + or media.get("workflowStepId") + or video_operation.get("sceneId") + ) + if scene_id: + operation["sceneId"] = scene_id + + video_url = self._extract_video_url_from_media(media) + aspect_ratio = ( + self._find_nested_string(video, ("aspectRatio", "videoAspectRatio")) + or self._find_nested_string(media.get("mediaMetadata", {}), ("videoAspectRatio", "aspectRatio")) + ) + video_metadata: Dict[str, Any] = {} + if video_url: + video_metadata["fifeUrl"] = video_url + if media_name: + video_metadata["mediaGenerationId"] = media_name + if aspect_ratio: + video_metadata["aspectRatio"] = aspect_ratio + if video_metadata: + operation["operation"]["metadata"] = {"video": video_metadata} + + error = status_block.get("error") if isinstance(status_block, dict) else None + if isinstance(error, dict): + operation["operation"]["error"] = error + + return operation + + def _merge_video_operations_with_media( + self, + operations: List[Dict[str, Any]], + media_operations: List[Dict[str, Any]], + fallback_project_id: Optional[str] = None, + ) -> List[Dict[str, Any]]: + media_by_name: Dict[str, Dict[str, Any]] = {} + for item in media_operations: + media_name = item.get("mediaName") or (item.get("operation") or {}).get("name") + if media_name: + media_by_name[media_name] = item + + merged: List[Dict[str, Any]] = [] + for raw_operation in operations: + operation = dict(raw_operation) if isinstance(raw_operation, dict) else {} + operation_body = dict(operation.get("operation") or {}) + operation["operation"] = operation_body + name = operation_body.get("name") or operation.get("mediaName") + media_operation = media_by_name.get(name) if name else None + if media_operation: + operation.setdefault("mediaName", media_operation.get("mediaName")) + operation.setdefault("projectId", media_operation.get("projectId")) + operation.setdefault("status", media_operation.get("status")) + operation.setdefault("sceneId", media_operation.get("sceneId")) + if "metadata" not in operation_body and (media_operation.get("operation") or {}).get("metadata"): + operation_body["metadata"] = (media_operation.get("operation") or {}).get("metadata") + if "error" not in operation_body and (media_operation.get("operation") or {}).get("error"): + operation_body["error"] = (media_operation.get("operation") or {}).get("error") + elif fallback_project_id: + operation.setdefault("projectId", fallback_project_id) + merged.append(operation) + + return merged + + def _normalize_video_generation_response( + self, + result: Dict[str, Any], + fallback_project_id: Optional[str] = None, + ) -> Dict[str, Any]: + if not isinstance(result, dict): + return result + + normalized = dict(result) + media_items = normalized.get("media") + media_operations: List[Dict[str, Any]] = [] + if isinstance(media_items, list): + for media in media_items: + operation = self._media_to_video_operation(media, fallback_project_id=fallback_project_id) + if operation: + media_operations.append(operation) + + operations = normalized.get("operations") + if isinstance(operations, list) and operations: + normalized["operations"] = self._merge_video_operations_with_media( + operations, + media_operations, + fallback_project_id=fallback_project_id, + ) + elif media_operations: + normalized["operations"] = media_operations + + return normalized + + def _operations_to_media_refs( + self, + operations: List[Dict[str, Any]], + ) -> List[Dict[str, str]]: + media_refs: List[Dict[str, str]] = [] + for operation in operations or []: + if not isinstance(operation, dict): + continue + operation_body = operation.get("operation") or {} + media_name = ( + operation.get("mediaName") + or operation.get("name") + or operation_body.get("name") + ) + project_id = ( + operation.get("projectId") + or operation.get("project_id") + or operation_body.get("projectId") + ) + if isinstance(media_name, str) and media_name.strip() and isinstance(project_id, str) and project_id.strip(): + media_refs.append({ + "name": media_name.strip(), + "projectId": project_id.strip(), + }) + return media_refs + async def generate_video_text( self, at: str, @@ -1345,9 +1558,7 @@ class FlowClient: "requests": [request_data] } if use_v2_model_config: - json_data["mediaGenerationContext"] = { - "batchId": str(uuid.uuid4()) - } + json_data["mediaGenerationContext"] = self._build_video_media_generation_context() json_data["useV2ModelConfig"] = True try: @@ -1357,7 +1568,7 @@ class FlowClient: at=at, timeout=self._get_video_submit_timeout() ) - return result + return self._normalize_video_generation_response(result, fallback_project_id=project_id) except Exception as e: last_error = e should_retry = await self._handle_retryable_generation_error( @@ -1447,9 +1658,7 @@ class FlowClient: scene_id = str(uuid.uuid4()) json_data = { - "mediaGenerationContext": { - "batchId": batch_id - }, + "mediaGenerationContext": self._build_video_media_generation_context(batch_id), "clientContext": { "recaptchaContext": { "token": recaptcha_token, @@ -1486,7 +1695,7 @@ class FlowClient: at=at, timeout=self._get_video_submit_timeout() ) - return result + return self._normalize_video_generation_response(result, fallback_project_id=project_id) except Exception as e: last_error = e should_retry = await self._handle_retryable_generation_error( @@ -1606,9 +1815,7 @@ class FlowClient: "requests": [request_data] } if use_v2_model_config: - json_data["mediaGenerationContext"] = { - "batchId": str(uuid.uuid4()) - } + json_data["mediaGenerationContext"] = self._build_video_media_generation_context() json_data["useV2ModelConfig"] = True try: @@ -1618,7 +1825,7 @@ class FlowClient: at=at, timeout=self._get_video_submit_timeout() ) - return result + return self._normalize_video_generation_response(result, fallback_project_id=project_id) except Exception as e: last_error = e should_retry = await self._handle_retryable_generation_error( @@ -1734,9 +1941,7 @@ class FlowClient: "requests": [request_data] } if use_v2_model_config: - json_data["mediaGenerationContext"] = { - "batchId": str(uuid.uuid4()) - } + json_data["mediaGenerationContext"] = self._build_video_media_generation_context() json_data["useV2ModelConfig"] = True try: @@ -1746,7 +1951,7 @@ class FlowClient: at=at, timeout=self._get_video_submit_timeout() ) - return result + return self._normalize_video_generation_response(result, fallback_project_id=project_id) except Exception as e: last_error = e should_retry = await self._handle_retryable_generation_error( @@ -1846,9 +2051,7 @@ class FlowClient: "tool": "PINHOLE", "userPaygateTier": user_paygate_tier }, - "mediaGenerationContext": { - "batchId": str(uuid.uuid4()) - }, + "mediaGenerationContext": self._build_video_media_generation_context(), "requests": [{ "aspectRatio": aspect_ratio, "seed": random.randint(1, 99999), @@ -1880,7 +2083,7 @@ class FlowClient: at=at, timeout=self._get_video_submit_timeout() ) - return result + return self._normalize_video_generation_response(result, fallback_project_id=project_id) except Exception as e: last_error = e should_retry = await self._handle_retryable_generation_error( @@ -2131,7 +2334,9 @@ class FlowClient: "token": recaptcha_token, "applicationType": "RECAPTCHA_APPLICATION_TYPE_WEB" }, - "sessionId": session_id + "sessionId": session_id, + "projectId": project_id, + "tool": "PINHOLE" } } @@ -2142,7 +2347,7 @@ class FlowClient: at=at, timeout=self._get_video_submit_timeout() ) - return result + return self._normalize_video_generation_response(result, fallback_project_id=project_id) except Exception as e: last_error = e should_retry = await self._handle_retryable_generation_error( @@ -2183,21 +2388,32 @@ class FlowClient: """ url = f"{self.api_base_url}/video:batchCheckAsyncVideoGenerationStatus" - json_data = { - "operations": operations - } + media_refs = self._operations_to_media_refs(operations) + json_data = {"media": media_refs} if media_refs else {"operations": operations} max_retries = config.flow_max_retries last_error: Optional[Exception] = None for retry_attempt in range(max_retries): try: - return await self._make_video_api_request( + result = await self._make_video_api_request( url=url, json_data=json_data, at=at, timeout=self._get_video_poll_timeout() ) + return self._normalize_video_generation_response(result) except Exception as e: + if media_refs: + try: + result = await self._make_video_api_request( + url=url, + json_data={"operations": operations}, + at=at, + timeout=self._get_video_poll_timeout() + ) + return self._normalize_video_generation_response(result) + except Exception: + pass last_error = e retry_reason = self._get_retry_reason(str(e)) if retry_reason and retry_attempt < max_retries - 1: diff --git a/tests/test_flow_client_upload.py b/tests/test_flow_client_upload.py index a9d6f5f..2d20a1d 100644 --- a/tests/test_flow_client_upload.py +++ b/tests/test_flow_client_upload.py @@ -37,6 +37,36 @@ class FlowClientUploadImageTests(unittest.IsolatedAsyncioTestCase): request_calls[0]["json_data"]["clientContext"]["projectId"], "project-123", ) + self.assertIn("sessionId", request_calls[0]["json_data"]["clientContext"]) + + async def test_project_scoped_upload_accepts_media_list_response(self): + client = FlowClient(proxy_manager=None) + + request_calls = [] + + async def fake_make_request(**kwargs): + request_calls.append(kwargs) + return { + "media": [ + { + "name": "new-media-id", + "projectId": "project-123", + } + ] + } + + client._make_request = AsyncMock(side_effect=fake_make_request) + + media_id = await client.upload_image( + at="test-at", + image_bytes=JPEG_BYTES, + aspect_ratio="IMAGE_ASPECT_RATIO_LANDSCAPE", + project_id="project-123", + ) + + self.assertEqual(media_id, "new-media-id") + self.assertEqual(len(request_calls), 1) + self.assertTrue(request_calls[0]["url"].endswith("/flow/uploadImage")) async def test_project_scoped_upload_does_not_fallback_to_legacy_endpoint(self): client = FlowClient(proxy_manager=None) diff --git a/tests/test_veo_lite_support.py b/tests/test_veo_lite_support.py index 022f5ac..c9621df 100644 --- a/tests/test_veo_lite_support.py +++ b/tests/test_veo_lite_support.py @@ -174,6 +174,108 @@ class VeoLiteFlowClientTests(unittest.IsolatedAsyncioTestCase): ) self.assertNotIn("prompt", request_data["textInput"]) self.assertEqual(request_data["videoModelKey"], "veo_3_1_t2v_lite") + self.assertEqual( + json_data["mediaGenerationContext"]["audioFailurePreference"], + "BLOCK_SILENCED_VIDEOS", + ) + + async def test_generate_video_text_normalizes_media_only_create_response(self): + captured = {} + + async def fake_make_request(method, url, json_data, use_at, at_token, **kwargs): + captured["json_data"] = json_data + return { + "remainingCredits": 30, + "workflows": [ + { + "name": "workflow-1", + "metadata": {"primaryMediaId": "media-1"}, + "projectId": "project-1", + } + ], + "media": [ + { + "name": "media-1", + "projectId": "project-1", + "mediaMetadata": { + "mediaStatus": { + "mediaGenerationStatus": "MEDIA_GENERATION_STATUS_PENDING" + } + }, + } + ], + } + + self.client._make_request = AsyncMock(side_effect=fake_make_request) + + result = await self.client.generate_video_text( + at="at-token", + project_id="project-1", + prompt="猫猫", + model_key="veo_3_1_t2v_lite", + aspect_ratio="VIDEO_ASPECT_RATIO_LANDSCAPE", + use_v2_model_config=True, + ) + + self.assertEqual( + captured["json_data"]["mediaGenerationContext"]["audioFailurePreference"], + "BLOCK_SILENCED_VIDEOS", + ) + self.assertEqual(result["operations"][0]["operation"]["name"], "media-1") + self.assertEqual(result["operations"][0]["projectId"], "project-1") + self.assertEqual( + result["operations"][0]["status"], + "MEDIA_GENERATION_STATUS_PENDING", + ) + + async def test_check_video_status_uses_media_payload_and_normalizes_response(self): + captured = {} + + async def fake_make_request(method, url, json_data, use_at, at_token, **kwargs): + captured["json_data"] = json_data + return { + "media": [ + { + "name": "media-1", + "projectId": "project-1", + "mediaMetadata": { + "mediaStatus": { + "mediaGenerationStatus": "MEDIA_GENERATION_STATUS_SUCCESSFUL" + } + }, + "video": { + "fifeUrl": "https://flow-content.google/video/11111111-1111-1111-1111-111111111111?token=abc", + "generatedVideo": { + "aspectRatio": "VIDEO_ASPECT_RATIO_LANDSCAPE" + }, + }, + } + ] + } + + self.client._make_request = AsyncMock(side_effect=fake_make_request) + + result = await self.client.check_video_status( + at="at-token", + operations=[ + { + "operation": {"name": "media-1"}, + "projectId": "project-1", + } + ], + ) + + self.assertEqual( + captured["json_data"], + {"media": [{"name": "media-1", "projectId": "project-1"}]}, + ) + operation = result["operations"][0] + self.assertEqual(operation["operation"]["name"], "media-1") + self.assertEqual(operation["status"], "MEDIA_GENERATION_STATUS_SUCCESSFUL") + self.assertEqual( + operation["operation"]["metadata"]["video"]["fifeUrl"], + "https://flow-content.google/video/11111111-1111-1111-1111-111111111111?token=abc", + ) async def test_generate_video_start_end_uses_v2_payload_for_interpolation_lite(self): captured = {}