fix(video): 兼容 Flow 视频媒体协议更新

This commit is contained in:
genz27
2026-05-17 21:19:25 +08:00
parent e966492244
commit ab3b2c9161
3 changed files with 375 additions and 27 deletions

View File

@@ -854,6 +854,7 @@ class FlowClient:
new_url = f"{self.api_base_url}/flow/uploadImage"
normalized_project_id = str(project_id or "").strip()
new_client_context = {
"sessionId": self._generate_session_id(),
"tool": "PINHOLE"
}
if normalized_project_id:
@@ -909,7 +910,7 @@ class FlowClient:
use_media_proxy=True
)
media_id = (
new_result.get("media", {}).get("name")
self._extract_media_name(new_result.get("media"))
or new_result.get("mediaGenerationId", {}).get("mediaGenerationId")
)
if media_id:
@@ -1235,6 +1236,20 @@ class FlowClient:
# ========== 视频生成 (使用AT) - 异步返回 ==========
def _extract_media_name(self, media: Any) -> Optional[str]:
"""从新版 media 对象或数组中提取 media id。"""
if isinstance(media, list):
for item in media:
media_name = self._extract_media_name(item)
if media_name:
return media_name
return None
if isinstance(media, dict):
name = media.get("name")
if isinstance(name, str) and name.strip():
return name.strip()
return None
def _build_video_text_input(self, prompt: str, use_v2_model_config: bool = False) -> Dict[str, Any]:
if use_v2_model_config:
return {
@@ -1248,6 +1263,204 @@ class FlowClient:
"prompt": prompt
}
def _build_video_media_generation_context(self, batch_id: Optional[str] = None) -> Dict[str, Any]:
return {
"batchId": batch_id or str(uuid.uuid4()),
"audioFailurePreference": "BLOCK_SILENCED_VIDEOS",
}
def _find_nested_string(self, value: Any, keys: tuple[str, ...]) -> Optional[str]:
if isinstance(value, dict):
for key in keys:
candidate = value.get(key)
if isinstance(candidate, str) and candidate.strip():
return candidate.strip()
for candidate in value.values():
found = self._find_nested_string(candidate, keys)
if found:
return found
elif isinstance(value, list):
for item in value:
found = self._find_nested_string(item, keys)
if found:
return found
return None
def _extract_video_status_from_media(self, media: Dict[str, Any]) -> tuple[Optional[str], Dict[str, Any]]:
status_block = (
media.get("mediaMetadata", {}).get("mediaStatus", {})
or media.get("mediaStatus", {})
or {}
)
status = (
status_block.get("mediaGenerationStatus")
or status_block.get("status")
or media.get("status")
)
return status, status_block if isinstance(status_block, dict) else {}
def _extract_video_url_from_media(self, media: Dict[str, Any]) -> Optional[str]:
video = media.get("video") if isinstance(media.get("video"), dict) else {}
candidates = [
self._find_nested_string(video, ("fifeUrl", "videoUrl", "outputUri", "downloadUri")),
self._find_nested_string(media, ("fifeUrl", "videoUrl", "outputUri", "downloadUri")),
self._find_nested_string(video, ("uri", "url")),
]
for candidate in candidates:
if candidate and (candidate.startswith("http://") or candidate.startswith("https://") or candidate.startswith("/")):
return candidate
return None
def _media_to_video_operation(
self,
media: Dict[str, Any],
fallback_project_id: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
if not isinstance(media, dict):
return None
media_name = self._extract_media_name(media)
video = media.get("video") if isinstance(media.get("video"), dict) else {}
video_operation = video.get("operation") if isinstance(video.get("operation"), dict) else {}
operation_name = (
video_operation.get("name")
or self._find_nested_string(video_operation, ("name",))
or media_name
)
if not operation_name:
return None
project_id = media.get("projectId") or fallback_project_id
status, status_block = self._extract_video_status_from_media(media)
operation: Dict[str, Any] = {
"operation": {
"name": operation_name,
},
"status": status or "MEDIA_GENERATION_STATUS_PENDING",
}
if media_name:
operation["mediaName"] = media_name
if project_id:
operation["projectId"] = project_id
scene_id = (
media.get("sceneId")
or media.get("workflowStepId")
or video_operation.get("sceneId")
)
if scene_id:
operation["sceneId"] = scene_id
video_url = self._extract_video_url_from_media(media)
aspect_ratio = (
self._find_nested_string(video, ("aspectRatio", "videoAspectRatio"))
or self._find_nested_string(media.get("mediaMetadata", {}), ("videoAspectRatio", "aspectRatio"))
)
video_metadata: Dict[str, Any] = {}
if video_url:
video_metadata["fifeUrl"] = video_url
if media_name:
video_metadata["mediaGenerationId"] = media_name
if aspect_ratio:
video_metadata["aspectRatio"] = aspect_ratio
if video_metadata:
operation["operation"]["metadata"] = {"video": video_metadata}
error = status_block.get("error") if isinstance(status_block, dict) else None
if isinstance(error, dict):
operation["operation"]["error"] = error
return operation
def _merge_video_operations_with_media(
self,
operations: List[Dict[str, Any]],
media_operations: List[Dict[str, Any]],
fallback_project_id: Optional[str] = None,
) -> List[Dict[str, Any]]:
media_by_name: Dict[str, Dict[str, Any]] = {}
for item in media_operations:
media_name = item.get("mediaName") or (item.get("operation") or {}).get("name")
if media_name:
media_by_name[media_name] = item
merged: List[Dict[str, Any]] = []
for raw_operation in operations:
operation = dict(raw_operation) if isinstance(raw_operation, dict) else {}
operation_body = dict(operation.get("operation") or {})
operation["operation"] = operation_body
name = operation_body.get("name") or operation.get("mediaName")
media_operation = media_by_name.get(name) if name else None
if media_operation:
operation.setdefault("mediaName", media_operation.get("mediaName"))
operation.setdefault("projectId", media_operation.get("projectId"))
operation.setdefault("status", media_operation.get("status"))
operation.setdefault("sceneId", media_operation.get("sceneId"))
if "metadata" not in operation_body and (media_operation.get("operation") or {}).get("metadata"):
operation_body["metadata"] = (media_operation.get("operation") or {}).get("metadata")
if "error" not in operation_body and (media_operation.get("operation") or {}).get("error"):
operation_body["error"] = (media_operation.get("operation") or {}).get("error")
elif fallback_project_id:
operation.setdefault("projectId", fallback_project_id)
merged.append(operation)
return merged
def _normalize_video_generation_response(
self,
result: Dict[str, Any],
fallback_project_id: Optional[str] = None,
) -> Dict[str, Any]:
if not isinstance(result, dict):
return result
normalized = dict(result)
media_items = normalized.get("media")
media_operations: List[Dict[str, Any]] = []
if isinstance(media_items, list):
for media in media_items:
operation = self._media_to_video_operation(media, fallback_project_id=fallback_project_id)
if operation:
media_operations.append(operation)
operations = normalized.get("operations")
if isinstance(operations, list) and operations:
normalized["operations"] = self._merge_video_operations_with_media(
operations,
media_operations,
fallback_project_id=fallback_project_id,
)
elif media_operations:
normalized["operations"] = media_operations
return normalized
def _operations_to_media_refs(
self,
operations: List[Dict[str, Any]],
) -> List[Dict[str, str]]:
media_refs: List[Dict[str, str]] = []
for operation in operations or []:
if not isinstance(operation, dict):
continue
operation_body = operation.get("operation") or {}
media_name = (
operation.get("mediaName")
or operation.get("name")
or operation_body.get("name")
)
project_id = (
operation.get("projectId")
or operation.get("project_id")
or operation_body.get("projectId")
)
if isinstance(media_name, str) and media_name.strip() and isinstance(project_id, str) and project_id.strip():
media_refs.append({
"name": media_name.strip(),
"projectId": project_id.strip(),
})
return media_refs
async def generate_video_text(
self,
at: str,
@@ -1345,9 +1558,7 @@ class FlowClient:
"requests": [request_data]
}
if use_v2_model_config:
json_data["mediaGenerationContext"] = {
"batchId": str(uuid.uuid4())
}
json_data["mediaGenerationContext"] = self._build_video_media_generation_context()
json_data["useV2ModelConfig"] = True
try:
@@ -1357,7 +1568,7 @@ class FlowClient:
at=at,
timeout=self._get_video_submit_timeout()
)
return result
return self._normalize_video_generation_response(result, fallback_project_id=project_id)
except Exception as e:
last_error = e
should_retry = await self._handle_retryable_generation_error(
@@ -1447,9 +1658,7 @@ class FlowClient:
scene_id = str(uuid.uuid4())
json_data = {
"mediaGenerationContext": {
"batchId": batch_id
},
"mediaGenerationContext": self._build_video_media_generation_context(batch_id),
"clientContext": {
"recaptchaContext": {
"token": recaptcha_token,
@@ -1486,7 +1695,7 @@ class FlowClient:
at=at,
timeout=self._get_video_submit_timeout()
)
return result
return self._normalize_video_generation_response(result, fallback_project_id=project_id)
except Exception as e:
last_error = e
should_retry = await self._handle_retryable_generation_error(
@@ -1606,9 +1815,7 @@ class FlowClient:
"requests": [request_data]
}
if use_v2_model_config:
json_data["mediaGenerationContext"] = {
"batchId": str(uuid.uuid4())
}
json_data["mediaGenerationContext"] = self._build_video_media_generation_context()
json_data["useV2ModelConfig"] = True
try:
@@ -1618,7 +1825,7 @@ class FlowClient:
at=at,
timeout=self._get_video_submit_timeout()
)
return result
return self._normalize_video_generation_response(result, fallback_project_id=project_id)
except Exception as e:
last_error = e
should_retry = await self._handle_retryable_generation_error(
@@ -1734,9 +1941,7 @@ class FlowClient:
"requests": [request_data]
}
if use_v2_model_config:
json_data["mediaGenerationContext"] = {
"batchId": str(uuid.uuid4())
}
json_data["mediaGenerationContext"] = self._build_video_media_generation_context()
json_data["useV2ModelConfig"] = True
try:
@@ -1746,7 +1951,7 @@ class FlowClient:
at=at,
timeout=self._get_video_submit_timeout()
)
return result
return self._normalize_video_generation_response(result, fallback_project_id=project_id)
except Exception as e:
last_error = e
should_retry = await self._handle_retryable_generation_error(
@@ -1846,9 +2051,7 @@ class FlowClient:
"tool": "PINHOLE",
"userPaygateTier": user_paygate_tier
},
"mediaGenerationContext": {
"batchId": str(uuid.uuid4())
},
"mediaGenerationContext": self._build_video_media_generation_context(),
"requests": [{
"aspectRatio": aspect_ratio,
"seed": random.randint(1, 99999),
@@ -1880,7 +2083,7 @@ class FlowClient:
at=at,
timeout=self._get_video_submit_timeout()
)
return result
return self._normalize_video_generation_response(result, fallback_project_id=project_id)
except Exception as e:
last_error = e
should_retry = await self._handle_retryable_generation_error(
@@ -2131,7 +2334,9 @@ class FlowClient:
"token": recaptcha_token,
"applicationType": "RECAPTCHA_APPLICATION_TYPE_WEB"
},
"sessionId": session_id
"sessionId": session_id,
"projectId": project_id,
"tool": "PINHOLE"
}
}
@@ -2142,7 +2347,7 @@ class FlowClient:
at=at,
timeout=self._get_video_submit_timeout()
)
return result
return self._normalize_video_generation_response(result, fallback_project_id=project_id)
except Exception as e:
last_error = e
should_retry = await self._handle_retryable_generation_error(
@@ -2183,21 +2388,32 @@ class FlowClient:
"""
url = f"{self.api_base_url}/video:batchCheckAsyncVideoGenerationStatus"
json_data = {
"operations": operations
}
media_refs = self._operations_to_media_refs(operations)
json_data = {"media": media_refs} if media_refs else {"operations": operations}
max_retries = config.flow_max_retries
last_error: Optional[Exception] = None
for retry_attempt in range(max_retries):
try:
return await self._make_video_api_request(
result = await self._make_video_api_request(
url=url,
json_data=json_data,
at=at,
timeout=self._get_video_poll_timeout()
)
return self._normalize_video_generation_response(result)
except Exception as e:
if media_refs:
try:
result = await self._make_video_api_request(
url=url,
json_data={"operations": operations},
at=at,
timeout=self._get_video_poll_timeout()
)
return self._normalize_video_generation_response(result)
except Exception:
pass
last_error = e
retry_reason = self._get_retry_reason(str(e))
if retry_reason and retry_attempt < max_retries - 1:

View File

@@ -37,6 +37,36 @@ class FlowClientUploadImageTests(unittest.IsolatedAsyncioTestCase):
request_calls[0]["json_data"]["clientContext"]["projectId"],
"project-123",
)
self.assertIn("sessionId", request_calls[0]["json_data"]["clientContext"])
async def test_project_scoped_upload_accepts_media_list_response(self):
client = FlowClient(proxy_manager=None)
request_calls = []
async def fake_make_request(**kwargs):
request_calls.append(kwargs)
return {
"media": [
{
"name": "new-media-id",
"projectId": "project-123",
}
]
}
client._make_request = AsyncMock(side_effect=fake_make_request)
media_id = await client.upload_image(
at="test-at",
image_bytes=JPEG_BYTES,
aspect_ratio="IMAGE_ASPECT_RATIO_LANDSCAPE",
project_id="project-123",
)
self.assertEqual(media_id, "new-media-id")
self.assertEqual(len(request_calls), 1)
self.assertTrue(request_calls[0]["url"].endswith("/flow/uploadImage"))
async def test_project_scoped_upload_does_not_fallback_to_legacy_endpoint(self):
client = FlowClient(proxy_manager=None)

View File

@@ -174,6 +174,108 @@ class VeoLiteFlowClientTests(unittest.IsolatedAsyncioTestCase):
)
self.assertNotIn("prompt", request_data["textInput"])
self.assertEqual(request_data["videoModelKey"], "veo_3_1_t2v_lite")
self.assertEqual(
json_data["mediaGenerationContext"]["audioFailurePreference"],
"BLOCK_SILENCED_VIDEOS",
)
async def test_generate_video_text_normalizes_media_only_create_response(self):
captured = {}
async def fake_make_request(method, url, json_data, use_at, at_token, **kwargs):
captured["json_data"] = json_data
return {
"remainingCredits": 30,
"workflows": [
{
"name": "workflow-1",
"metadata": {"primaryMediaId": "media-1"},
"projectId": "project-1",
}
],
"media": [
{
"name": "media-1",
"projectId": "project-1",
"mediaMetadata": {
"mediaStatus": {
"mediaGenerationStatus": "MEDIA_GENERATION_STATUS_PENDING"
}
},
}
],
}
self.client._make_request = AsyncMock(side_effect=fake_make_request)
result = await self.client.generate_video_text(
at="at-token",
project_id="project-1",
prompt="猫猫",
model_key="veo_3_1_t2v_lite",
aspect_ratio="VIDEO_ASPECT_RATIO_LANDSCAPE",
use_v2_model_config=True,
)
self.assertEqual(
captured["json_data"]["mediaGenerationContext"]["audioFailurePreference"],
"BLOCK_SILENCED_VIDEOS",
)
self.assertEqual(result["operations"][0]["operation"]["name"], "media-1")
self.assertEqual(result["operations"][0]["projectId"], "project-1")
self.assertEqual(
result["operations"][0]["status"],
"MEDIA_GENERATION_STATUS_PENDING",
)
async def test_check_video_status_uses_media_payload_and_normalizes_response(self):
captured = {}
async def fake_make_request(method, url, json_data, use_at, at_token, **kwargs):
captured["json_data"] = json_data
return {
"media": [
{
"name": "media-1",
"projectId": "project-1",
"mediaMetadata": {
"mediaStatus": {
"mediaGenerationStatus": "MEDIA_GENERATION_STATUS_SUCCESSFUL"
}
},
"video": {
"fifeUrl": "https://flow-content.google/video/11111111-1111-1111-1111-111111111111?token=abc",
"generatedVideo": {
"aspectRatio": "VIDEO_ASPECT_RATIO_LANDSCAPE"
},
},
}
]
}
self.client._make_request = AsyncMock(side_effect=fake_make_request)
result = await self.client.check_video_status(
at="at-token",
operations=[
{
"operation": {"name": "media-1"},
"projectId": "project-1",
}
],
)
self.assertEqual(
captured["json_data"],
{"media": [{"name": "media-1", "projectId": "project-1"}]},
)
operation = result["operations"][0]
self.assertEqual(operation["operation"]["name"], "media-1")
self.assertEqual(operation["status"], "MEDIA_GENERATION_STATUS_SUCCESSFUL")
self.assertEqual(
operation["operation"]["metadata"]["video"]["fifeUrl"],
"https://flow-content.google/video/11111111-1111-1111-1111-111111111111?token=abc",
)
async def test_generate_video_start_end_uses_v2_payload_for_interpolation_lite(self):
captured = {}