diff --git a/src-tauri/src/proxy/providers/streaming.rs b/src-tauri/src/proxy/providers/streaming.rs index 725255d1..e923537f 100644 --- a/src-tauri/src/proxy/providers/streaming.rs +++ b/src-tauri/src/proxy/providers/streaming.rs @@ -112,6 +112,28 @@ fn build_anthropic_usage_json(usage: &Usage) -> Value { usage_json } +fn default_anthropic_usage_json() -> Value { + json!({ + "input_tokens": 0, + "output_tokens": 0 + }) +} + +fn build_message_delta_event(stop_reason: Option, usage_json: Option) -> Value { + let usage = usage_json + .filter(|usage| usage.is_object()) + .unwrap_or_else(default_anthropic_usage_json); + + json!({ + "type": "message_delta", + "delta": { + "stop_reason": stop_reason, + "stop_sequence": null + }, + "usage": usage + }) +} + /// 创建 Anthropic SSE 流 pub fn create_anthropic_sse_stream( stream: impl Stream> + Send + 'static, @@ -157,16 +179,7 @@ pub fn create_anthropic_sse_stream( // 流正常结束,发出缓存的 message_delta(含完整 usage)。 if let Some((stop_reason, usage_json)) = pending_message_delta.take() { - let mut event = json!({ - "type": "message_delta", - "delta": { - "stop_reason": stop_reason, - "stop_sequence": null - } - }); - if let Some(uj) = usage_json { - event["usage"] = uj; - } + let event = build_message_delta_event(stop_reason, usage_json); let sse_data = format!("event: message_delta\ndata: {}\n\n", serde_json::to_string(&event).unwrap_or_default()); log::debug!("[Claude/OpenRouter] >>> Anthropic SSE: message_delta (from pending)"); @@ -616,16 +629,7 @@ pub fn create_anthropic_sse_stream( let emitted_pending_message_delta = if let Some((stop_reason, usage_json)) = pending_message_delta.take() { - let mut event = json!({ - "type": "message_delta", - "delta": { - "stop_reason": stop_reason, - "stop_sequence": null - } - }); - if let Some(uj) = usage_json { - event["usage"] = uj; - } + let event = build_message_delta_event(stop_reason, usage_json); let sse_data = format!("event: message_delta\ndata: {}\n\n", serde_json::to_string(&event).unwrap_or_default()); log::debug!("[Claude/OpenRouter] >>> Anthropic SSE: message_delta (at stream end)"); @@ -1031,6 +1035,42 @@ mod tests { ); } + #[tokio::test] + async fn test_message_delta_includes_zero_usage_when_stream_has_no_usage() { + let input = concat!( + "data: {\"id\":\"chatcmpl_no_usage\",\"model\":\"gpt-5.5\",\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_0\",\"type\":\"function\",\"function\":{\"name\":\"get_time\",\"arguments\":\"{}\"}}]}}]}\n\n", + "data: {\"id\":\"chatcmpl_no_usage\",\"model\":\"gpt-5.5\",\"choices\":[{\"delta\":{},\"finish_reason\":\"tool_calls\"}]}\n\n", + "data: [DONE]\n\n" + ); + + let events = collect_anthropic_events(input).await; + let message_deltas: Vec<&Value> = events + .iter() + .filter(|event| event_type(event) == Some("message_delta")) + .collect(); + + assert_eq!(message_deltas.len(), 1); + let message_delta = message_deltas[0]; + assert_eq!( + message_delta + .pointer("/delta/stop_reason") + .and_then(|v| v.as_str()), + Some("tool_use") + ); + assert_eq!( + message_delta + .pointer("/usage/input_tokens") + .and_then(|v| v.as_u64()), + Some(0) + ); + assert_eq!( + message_delta + .pointer("/usage/output_tokens") + .and_then(|v| v.as_u64()), + Some(0) + ); + } + #[tokio::test] async fn test_streaming_finalizes_after_finish_when_done_is_missing() { let input = concat!(