From dc04d8be52f71eb1051e17825ea93b6b1ed08036 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Thu, 11 Jun 2026 03:14:03 +0800 Subject: [PATCH] feat(translator): enhance response aggregation and annotation handling - Implemented logic to aggregate text blocks until `message_stop` for improved consistency. - Introduced support for message annotations like `citations_delta` in content responses. - Added methods (`finalizeAssistantMessage`, `appendMessageAnnotation`) to handle message grouping and annotation appending cleanly. - Updated unit tests to verify message aggregation, annotation handling, and suppression of unwanted native events. Closes: #3801 --- .../claude_openai-responses_response.go | 151 +++++++++++++----- .../claude_openai-responses_response_test.go | 88 ++++++++++ 2 files changed, 198 insertions(+), 41 deletions(-) diff --git a/internal/translator/claude/openai/responses/claude_openai-responses_response.go b/internal/translator/claude/openai/responses/claude_openai-responses_response.go index 6cf818091..d87397b34 100644 --- a/internal/translator/claude/openai/responses/claude_openai-responses_response.go +++ b/internal/translator/claude/openai/responses/claude_openai-responses_response.go @@ -14,20 +14,23 @@ import ( ) type claudeToResponsesState struct { - Seq int - ResponseID string - CreatedAt int64 - CurrentMsgID string - CurrentFCID string - InTextBlock bool - InFuncBlock bool - FuncArgsBuf map[int]*strings.Builder // index -> args + Seq int + ResponseID string + CreatedAt int64 + CurrentMsgID string + CurrentFCID string + InTextBlock bool + InFuncBlock bool + MessageOpen bool + ContentPartOpen bool + FuncArgsBuf map[int]*strings.Builder // index -> args // function call bookkeeping for output aggregation FuncNames map[int]string // index -> function name FuncCallIDs map[int]string // index -> call id // message text aggregation - TextBuf strings.Builder - CurrentTextBuf strings.Builder + TextBuf strings.Builder + CurrentTextBuf strings.Builder + MessageAnnotations []any // reasoning state ReasoningActive bool ReasoningItemID string @@ -57,6 +60,57 @@ func emitEvent(event string, payload []byte) []byte { return translatorcommon.SSEEventData(event, payload) } +func noSSEOutput(out [][]byte) [][]byte { + if out == nil { + return [][]byte{} + } + return out +} + +func (st *claudeToResponsesState) appendMessageAnnotation(annotation any) { + if annotation == nil { + return + } + st.MessageAnnotations = append(st.MessageAnnotations, annotation) +} + +func (st *claudeToResponsesState) finalizeAssistantMessage(nextSeq func() int) [][]byte { + if !st.MessageOpen { + return nil + } + fullText := st.TextBuf.String() + var out [][]byte + done := []byte(`{"type":"response.output_text.done","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"text":"","logprobs":[]}`) + done, _ = sjson.SetBytes(done, "sequence_number", nextSeq()) + done, _ = sjson.SetBytes(done, "item_id", st.CurrentMsgID) + done, _ = sjson.SetBytes(done, "text", fullText) + out = append(out, emitEvent("response.output_text.done", done)) + + partDone := []byte(`{"type":"response.content_part.done","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}}`) + partDone, _ = sjson.SetBytes(partDone, "sequence_number", nextSeq()) + partDone, _ = sjson.SetBytes(partDone, "item_id", st.CurrentMsgID) + partDone, _ = sjson.SetBytes(partDone, "part.text", fullText) + if len(st.MessageAnnotations) > 0 { + partDone, _ = sjson.SetBytes(partDone, "part.annotations", st.MessageAnnotations) + } + out = append(out, emitEvent("response.content_part.done", partDone)) + + final := []byte(`{"type":"response.output_item.done","sequence_number":0,"output_index":0,"item":{"id":"","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":""}],"role":"assistant"}}`) + final, _ = sjson.SetBytes(final, "sequence_number", nextSeq()) + final, _ = sjson.SetBytes(final, "item.id", st.CurrentMsgID) + final, _ = sjson.SetBytes(final, "item.content.0.text", fullText) + if len(st.MessageAnnotations) > 0 { + final, _ = sjson.SetBytes(final, "item.content.0.annotations", st.MessageAnnotations) + } + out = append(out, emitEvent("response.output_item.done", final)) + + st.InTextBlock = false + st.MessageOpen = false + st.ContentPartOpen = false + st.CurrentTextBuf.Reset() + return out +} + // ConvertClaudeResponseToOpenAIResponses converts Claude SSE to OpenAI Responses SSE events. func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName string, originalRequestRawJSON, requestRawJSON, rawJSON []byte, param *any) [][]byte { if *param == nil { @@ -83,10 +137,13 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin // Reset per-message aggregation state st.TextBuf.Reset() st.CurrentTextBuf.Reset() + st.MessageAnnotations = nil st.ReasoningBuf.Reset() st.ReasoningActive = false st.InTextBlock = false st.InFuncBlock = false + st.MessageOpen = false + st.ContentPartOpen = false st.CurrentMsgID = "" st.CurrentFCID = "" st.ReasoningItemID = "" @@ -125,24 +182,29 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin case "content_block_start": cb := root.Get("content_block") if !cb.Exists() { - return out + return noSSEOutput(out) } idx := int(root.Get("index").Int()) typ := cb.Get("type").String() if typ == "text" { - // open message item + content part st.InTextBlock = true - st.CurrentTextBuf.Reset() - st.CurrentMsgID = fmt.Sprintf("msg_%s_0", st.ResponseID) - item := []byte(`{"type":"response.output_item.added","sequence_number":0,"output_index":0,"item":{"id":"","type":"message","status":"in_progress","content":[],"role":"assistant"}}`) - item, _ = sjson.SetBytes(item, "sequence_number", nextSeq()) - item, _ = sjson.SetBytes(item, "item.id", st.CurrentMsgID) - out = append(out, emitEvent("response.output_item.added", item)) - - part := []byte(`{"type":"response.content_part.added","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}}`) - part, _ = sjson.SetBytes(part, "sequence_number", nextSeq()) - part, _ = sjson.SetBytes(part, "item_id", st.CurrentMsgID) - out = append(out, emitEvent("response.content_part.added", part)) + if st.CurrentMsgID == "" { + st.CurrentMsgID = fmt.Sprintf("msg_%s_0", st.ResponseID) + } + if !st.MessageOpen { + item := []byte(`{"type":"response.output_item.added","sequence_number":0,"output_index":0,"item":{"id":"","type":"message","status":"in_progress","content":[],"role":"assistant"}}`) + item, _ = sjson.SetBytes(item, "sequence_number", nextSeq()) + item, _ = sjson.SetBytes(item, "item.id", st.CurrentMsgID) + out = append(out, emitEvent("response.output_item.added", item)) + st.MessageOpen = true + } + if !st.ContentPartOpen { + part := []byte(`{"type":"response.content_part.added","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}}`) + part, _ = sjson.SetBytes(part, "sequence_number", nextSeq()) + part, _ = sjson.SetBytes(part, "item_id", st.CurrentMsgID) + out = append(out, emitEvent("response.content_part.added", part)) + st.ContentPartOpen = true + } } else if typ == "tool_use" { st.InFuncBlock = true st.CurrentFCID = cb.Get("id").String() @@ -187,7 +249,7 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin case "content_block_delta": d := root.Get("delta") if !d.Exists() { - return out + return noSSEOutput(out) } dt := d.Get("type").String() if dt == "text_delta" { @@ -202,6 +264,9 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin st.CurrentTextBuf.WriteString(t.String()) } } else if dt == "input_json_delta" { + if !st.InFuncBlock || st.CurrentFCID == "" { + return [][]byte{} + } idx := int(root.Get("index").Int()) if pj := d.Get("partial_json"); pj.Exists() { if st.FuncArgsBuf[idx] == nil { @@ -233,26 +298,16 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin st.ReasoningSignature = signature.String() } } + return [][]byte{} + } else if dt == "citations_delta" { + if citation := d.Get("citation"); citation.Exists() { + st.appendMessageAnnotation(citation.Value()) + } + return [][]byte{} } case "content_block_stop": idx := int(root.Get("index").Int()) if st.InTextBlock { - fullText := st.CurrentTextBuf.String() - done := []byte(`{"type":"response.output_text.done","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"text":"","logprobs":[]}`) - done, _ = sjson.SetBytes(done, "sequence_number", nextSeq()) - done, _ = sjson.SetBytes(done, "item_id", st.CurrentMsgID) - done, _ = sjson.SetBytes(done, "text", fullText) - out = append(out, emitEvent("response.output_text.done", done)) - partDone := []byte(`{"type":"response.content_part.done","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}}`) - partDone, _ = sjson.SetBytes(partDone, "sequence_number", nextSeq()) - partDone, _ = sjson.SetBytes(partDone, "item_id", st.CurrentMsgID) - partDone, _ = sjson.SetBytes(partDone, "part.text", fullText) - out = append(out, emitEvent("response.content_part.done", partDone)) - final := []byte(`{"type":"response.output_item.done","sequence_number":0,"output_index":0,"item":{"id":"","type":"message","status":"completed","content":[{"type":"output_text","text":""}],"role":"assistant"}}`) - final, _ = sjson.SetBytes(final, "sequence_number", nextSeq()) - final, _ = sjson.SetBytes(final, "item.id", st.CurrentMsgID) - final, _ = sjson.SetBytes(final, "item.content.0.text", fullText) - out = append(out, emitEvent("response.output_item.done", final)) st.InTextBlock = false } else if st.InFuncBlock { args := "{}" @@ -304,6 +359,7 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin st.ReasoningActive = false st.ReasoningPartAdded = false } + return noSSEOutput(out) case "message_delta": if usage := root.Get("usage"); usage.Exists() { if v := usage.Get("output_tokens"); v.Exists() { @@ -315,7 +371,9 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin st.UsageSeen = true } } + return [][]byte{} case "message_stop": + out = append(out, st.finalizeAssistantMessage(nextSeq)...) completed := []byte(`{"type":"response.completed","sequence_number":0,"response":{"id":"","object":"response","created_at":0,"status":"completed","background":false,"error":null}}`) completed, _ = sjson.SetBytes(completed, "sequence_number", nextSeq()) @@ -407,6 +465,9 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin item := []byte(`{"id":"","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":""}],"role":"assistant"}`) item, _ = sjson.SetBytes(item, "id", st.CurrentMsgID) item, _ = sjson.SetBytes(item, "content.0.text", st.TextBuf.String()) + if len(st.MessageAnnotations) > 0 { + item, _ = sjson.SetBytes(item, "content.0.annotations", st.MessageAnnotations) + } outputsWrapper, _ = sjson.SetRawBytes(outputsWrapper, "arr.-1", item) } // function_call items (in ascending index order for determinism) @@ -466,7 +527,7 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin out = append(out, emitEvent("response.completed", completed)) } - return out + return noSSEOutput(out) } // ConvertClaudeResponseToOpenAIResponsesNonStream aggregates Claude SSE into a single OpenAI Responses JSON. @@ -506,6 +567,7 @@ func ConvertClaudeResponseToOpenAIResponsesNonStream(_ context.Context, _ string reasoningActive bool reasoningItemID string reasoningSig string + annotations []any inputTokens int64 outputTokens int64 ) @@ -592,6 +654,10 @@ func ConvertClaudeResponseToOpenAIResponsesNonStream(_ context.Context, _ string reasoningSig = signature.String() } } + case "citations_delta": + if citation := d.Get("citation"); citation.Exists() { + annotations = append(annotations, citation.Value()) + } } case "content_block_stop": @@ -692,6 +758,9 @@ func ConvertClaudeResponseToOpenAIResponsesNonStream(_ context.Context, _ string item := []byte(`{"id":"","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":""}],"role":"assistant"}`) item, _ = sjson.SetBytes(item, "id", currentMsgID) item, _ = sjson.SetBytes(item, "content.0.text", textBuf.String()) + if len(annotations) > 0 { + item, _ = sjson.SetBytes(item, "content.0.annotations", annotations) + } outputsWrapper, _ = sjson.SetRawBytes(outputsWrapper, "arr.-1", item) } if len(toolCalls) > 0 { diff --git a/internal/translator/claude/openai/responses/claude_openai-responses_response_test.go b/internal/translator/claude/openai/responses/claude_openai-responses_response_test.go index 8161d0b29..90d19ec52 100644 --- a/internal/translator/claude/openai/responses/claude_openai-responses_response_test.go +++ b/internal/translator/claude/openai/responses/claude_openai-responses_response_test.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + sdktranslator "github.com/router-for-me/CLIProxyAPI/v7/sdk/translator" "github.com/tidwall/gjson" ) @@ -29,6 +30,15 @@ func parseClaudeResponsesSSEEvent(t *testing.T, chunk []byte) (string, gjson.Res return event, gjson.Parse(data) } +func translateClaudeResponsesStreamThroughRegistry(chunks [][]byte) [][]byte { + var param any + var outputs [][]byte + for _, chunk := range chunks { + outputs = append(outputs, sdktranslator.TranslateStream(context.Background(), sdktranslator.FormatClaude, sdktranslator.FormatOpenAIResponse, "claude-test", nil, nil, chunk, ¶m)...) + } + return outputs +} + func TestConvertClaudeResponseToOpenAIResponses_ThinkingIncludesSignature(t *testing.T) { signature := "claude_sig_123" chunks := [][]byte{ @@ -78,6 +88,84 @@ func TestConvertClaudeResponseToOpenAIResponses_ThinkingIncludesSignature(t *tes } } +func TestConvertClaudeResponseToOpenAIResponses_SuppressesSignatureDeltaPassthrough(t *testing.T) { + chunk := []byte(`data: {"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"claude_sig_123"}}`) + + outputs := translateClaudeResponsesStreamThroughRegistry([][]byte{chunk}) + if len(outputs) != 0 { + t.Fatalf("expected signature_delta to be suppressed, got %d chunks", len(outputs)) + } +} + +func TestConvertClaudeResponseToOpenAIResponses_AggregatesTextBlocksUntilMessageStop(t *testing.T) { + chunks := [][]byte{ + []byte(`data: {"type":"message_start","message":{"id":"msg_123","usage":{"input_tokens":1,"output_tokens":0}}}`), + []byte(`data: {"type":"content_block_start","index":4,"content_block":{"type":"text","text":""}}`), + []byte(`data: {"type":"content_block_delta","index":4,"delta":{"type":"text_delta","text":"**对比竞品**\n- "}}`), + []byte(`data: {"type":"content_block_stop","index":4}`), + []byte(`data: {"type":"content_block_start","index":5,"content_block":{"type":"server_tool_use","id":"srv_123","name":"web_search","input":{}}}`), + []byte(`data: {"type":"content_block_delta","index":5,"delta":{"type":"input_json_delta","partial_json":"{\"query\":\"Qwen3\"}"}}`), + []byte(`data: {"type":"content_block_stop","index":5}`), + []byte(`data: {"type":"content_block_start","index":6,"content_block":{"type":"web_search_tool_result","tool_use_id":"srv_123","content":[{"type":"web_search_result","title":"Example","url":"https://example.com"}]}}`), + []byte(`data: {"type":"content_block_stop","index":6}`), + []byte(`data: {"type":"content_block_delta","index":5,"delta":{"type":"citations_delta","citation":{"type":"web_search_result_location","cited_text":"Qwen 3.7 Max","url":"https://example.com","title":"Example"}}}`), + []byte(`data: {"type":"content_block_start","index":7,"content_block":{"type":"text","text":""}}`), + []byte(`data: {"type":"content_block_delta","index":7,"delta":{"type":"text_delta","text":"Qwen 3.7 Max leads."}}`), + []byte(`data: {"type":"content_block_stop","index":7}`), + []byte(`data: {"type":"message_delta","usage":{"output_tokens":12}}`), + []byte(`data: {"type":"message_stop"}`), + } + + outputs := translateClaudeResponsesStreamThroughRegistry(chunks) + + counts := map[string]int{} + var outputTextDone gjson.Result + var completed gjson.Result + for _, output := range outputs { + event, data := parseClaudeResponsesSSEEvent(t, output) + counts[event]++ + if event == "response.output_text.done" { + outputTextDone = data + } + if event == "response.completed" { + completed = data + } + if strings.HasPrefix(event, "content_block_") || event == "message_delta" { + t.Fatalf("unexpected anthropic-native event leaked: %s", event) + } + } + + if counts["response.output_item.added"] != 1 { + t.Fatalf("response.output_item.added count = %d, want 1", counts["response.output_item.added"]) + } + if counts["response.content_part.added"] != 1 { + t.Fatalf("response.content_part.added count = %d, want 1", counts["response.content_part.added"]) + } + if counts["response.output_text.done"] != 1 { + t.Fatalf("response.output_text.done count = %d, want 1", counts["response.output_text.done"]) + } + if counts["response.content_part.done"] != 1 { + t.Fatalf("response.content_part.done count = %d, want 1", counts["response.content_part.done"]) + } + if counts["response.output_item.done"] != 1 { + t.Fatalf("response.output_item.done count = %d, want 1", counts["response.output_item.done"]) + } + if counts["response.function_call_arguments.delta"] != 0 { + t.Fatalf("response.function_call_arguments.delta count = %d, want 0", counts["response.function_call_arguments.delta"]) + } + + wantText := "**对比竞品**\n- Qwen 3.7 Max leads." + if got := outputTextDone.Get("text").String(); got != wantText { + t.Fatalf("output_text.done text = %q, want %q", got, wantText) + } + if got := completed.Get("response.output.0.content.0.text").String(); got != wantText { + t.Fatalf("completed message text = %q, want %q", got, wantText) + } + if got := completed.Get("response.output.0.content.0.annotations.0.type").String(); got != "web_search_result_location" { + t.Fatalf("completed annotation type = %q", got) + } +} + func TestConvertClaudeResponseToOpenAIResponsesNonStream_ThinkingIncludesSignature(t *testing.T) { signature := "claude_sig_nonstream" raw := []byte(strings.Join([]string{