feat(translator): enhance response aggregation and annotation handling

- Implemented logic to aggregate text blocks until `message_stop` for improved consistency.
- Introduced support for message annotations like `citations_delta` in content responses.
- Added methods (`finalizeAssistantMessage`, `appendMessageAnnotation`) to handle message grouping and annotation appending cleanly.
- Updated unit tests to verify message aggregation, annotation handling, and suppression of unwanted native events.

Closes: #3801
This commit is contained in:
Luis Pater
2026-06-11 03:14:03 +08:00
parent 58bf645e66
commit dc04d8be52
2 changed files with 198 additions and 41 deletions

View File

@@ -14,20 +14,23 @@ import (
)
type claudeToResponsesState struct {
Seq int
ResponseID string
CreatedAt int64
CurrentMsgID string
CurrentFCID string
InTextBlock bool
InFuncBlock bool
FuncArgsBuf map[int]*strings.Builder // index -> args
Seq int
ResponseID string
CreatedAt int64
CurrentMsgID string
CurrentFCID string
InTextBlock bool
InFuncBlock bool
MessageOpen bool
ContentPartOpen bool
FuncArgsBuf map[int]*strings.Builder // index -> args
// function call bookkeeping for output aggregation
FuncNames map[int]string // index -> function name
FuncCallIDs map[int]string // index -> call id
// message text aggregation
TextBuf strings.Builder
CurrentTextBuf strings.Builder
TextBuf strings.Builder
CurrentTextBuf strings.Builder
MessageAnnotations []any
// reasoning state
ReasoningActive bool
ReasoningItemID string
@@ -57,6 +60,57 @@ func emitEvent(event string, payload []byte) []byte {
return translatorcommon.SSEEventData(event, payload)
}
func noSSEOutput(out [][]byte) [][]byte {
if out == nil {
return [][]byte{}
}
return out
}
func (st *claudeToResponsesState) appendMessageAnnotation(annotation any) {
if annotation == nil {
return
}
st.MessageAnnotations = append(st.MessageAnnotations, annotation)
}
func (st *claudeToResponsesState) finalizeAssistantMessage(nextSeq func() int) [][]byte {
if !st.MessageOpen {
return nil
}
fullText := st.TextBuf.String()
var out [][]byte
done := []byte(`{"type":"response.output_text.done","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"text":"","logprobs":[]}`)
done, _ = sjson.SetBytes(done, "sequence_number", nextSeq())
done, _ = sjson.SetBytes(done, "item_id", st.CurrentMsgID)
done, _ = sjson.SetBytes(done, "text", fullText)
out = append(out, emitEvent("response.output_text.done", done))
partDone := []byte(`{"type":"response.content_part.done","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}}`)
partDone, _ = sjson.SetBytes(partDone, "sequence_number", nextSeq())
partDone, _ = sjson.SetBytes(partDone, "item_id", st.CurrentMsgID)
partDone, _ = sjson.SetBytes(partDone, "part.text", fullText)
if len(st.MessageAnnotations) > 0 {
partDone, _ = sjson.SetBytes(partDone, "part.annotations", st.MessageAnnotations)
}
out = append(out, emitEvent("response.content_part.done", partDone))
final := []byte(`{"type":"response.output_item.done","sequence_number":0,"output_index":0,"item":{"id":"","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":""}],"role":"assistant"}}`)
final, _ = sjson.SetBytes(final, "sequence_number", nextSeq())
final, _ = sjson.SetBytes(final, "item.id", st.CurrentMsgID)
final, _ = sjson.SetBytes(final, "item.content.0.text", fullText)
if len(st.MessageAnnotations) > 0 {
final, _ = sjson.SetBytes(final, "item.content.0.annotations", st.MessageAnnotations)
}
out = append(out, emitEvent("response.output_item.done", final))
st.InTextBlock = false
st.MessageOpen = false
st.ContentPartOpen = false
st.CurrentTextBuf.Reset()
return out
}
// ConvertClaudeResponseToOpenAIResponses converts Claude SSE to OpenAI Responses SSE events.
func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName string, originalRequestRawJSON, requestRawJSON, rawJSON []byte, param *any) [][]byte {
if *param == nil {
@@ -83,10 +137,13 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin
// Reset per-message aggregation state
st.TextBuf.Reset()
st.CurrentTextBuf.Reset()
st.MessageAnnotations = nil
st.ReasoningBuf.Reset()
st.ReasoningActive = false
st.InTextBlock = false
st.InFuncBlock = false
st.MessageOpen = false
st.ContentPartOpen = false
st.CurrentMsgID = ""
st.CurrentFCID = ""
st.ReasoningItemID = ""
@@ -125,24 +182,29 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin
case "content_block_start":
cb := root.Get("content_block")
if !cb.Exists() {
return out
return noSSEOutput(out)
}
idx := int(root.Get("index").Int())
typ := cb.Get("type").String()
if typ == "text" {
// open message item + content part
st.InTextBlock = true
st.CurrentTextBuf.Reset()
st.CurrentMsgID = fmt.Sprintf("msg_%s_0", st.ResponseID)
item := []byte(`{"type":"response.output_item.added","sequence_number":0,"output_index":0,"item":{"id":"","type":"message","status":"in_progress","content":[],"role":"assistant"}}`)
item, _ = sjson.SetBytes(item, "sequence_number", nextSeq())
item, _ = sjson.SetBytes(item, "item.id", st.CurrentMsgID)
out = append(out, emitEvent("response.output_item.added", item))
part := []byte(`{"type":"response.content_part.added","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}}`)
part, _ = sjson.SetBytes(part, "sequence_number", nextSeq())
part, _ = sjson.SetBytes(part, "item_id", st.CurrentMsgID)
out = append(out, emitEvent("response.content_part.added", part))
if st.CurrentMsgID == "" {
st.CurrentMsgID = fmt.Sprintf("msg_%s_0", st.ResponseID)
}
if !st.MessageOpen {
item := []byte(`{"type":"response.output_item.added","sequence_number":0,"output_index":0,"item":{"id":"","type":"message","status":"in_progress","content":[],"role":"assistant"}}`)
item, _ = sjson.SetBytes(item, "sequence_number", nextSeq())
item, _ = sjson.SetBytes(item, "item.id", st.CurrentMsgID)
out = append(out, emitEvent("response.output_item.added", item))
st.MessageOpen = true
}
if !st.ContentPartOpen {
part := []byte(`{"type":"response.content_part.added","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}}`)
part, _ = sjson.SetBytes(part, "sequence_number", nextSeq())
part, _ = sjson.SetBytes(part, "item_id", st.CurrentMsgID)
out = append(out, emitEvent("response.content_part.added", part))
st.ContentPartOpen = true
}
} else if typ == "tool_use" {
st.InFuncBlock = true
st.CurrentFCID = cb.Get("id").String()
@@ -187,7 +249,7 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin
case "content_block_delta":
d := root.Get("delta")
if !d.Exists() {
return out
return noSSEOutput(out)
}
dt := d.Get("type").String()
if dt == "text_delta" {
@@ -202,6 +264,9 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin
st.CurrentTextBuf.WriteString(t.String())
}
} else if dt == "input_json_delta" {
if !st.InFuncBlock || st.CurrentFCID == "" {
return [][]byte{}
}
idx := int(root.Get("index").Int())
if pj := d.Get("partial_json"); pj.Exists() {
if st.FuncArgsBuf[idx] == nil {
@@ -233,26 +298,16 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin
st.ReasoningSignature = signature.String()
}
}
return [][]byte{}
} else if dt == "citations_delta" {
if citation := d.Get("citation"); citation.Exists() {
st.appendMessageAnnotation(citation.Value())
}
return [][]byte{}
}
case "content_block_stop":
idx := int(root.Get("index").Int())
if st.InTextBlock {
fullText := st.CurrentTextBuf.String()
done := []byte(`{"type":"response.output_text.done","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"text":"","logprobs":[]}`)
done, _ = sjson.SetBytes(done, "sequence_number", nextSeq())
done, _ = sjson.SetBytes(done, "item_id", st.CurrentMsgID)
done, _ = sjson.SetBytes(done, "text", fullText)
out = append(out, emitEvent("response.output_text.done", done))
partDone := []byte(`{"type":"response.content_part.done","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}}`)
partDone, _ = sjson.SetBytes(partDone, "sequence_number", nextSeq())
partDone, _ = sjson.SetBytes(partDone, "item_id", st.CurrentMsgID)
partDone, _ = sjson.SetBytes(partDone, "part.text", fullText)
out = append(out, emitEvent("response.content_part.done", partDone))
final := []byte(`{"type":"response.output_item.done","sequence_number":0,"output_index":0,"item":{"id":"","type":"message","status":"completed","content":[{"type":"output_text","text":""}],"role":"assistant"}}`)
final, _ = sjson.SetBytes(final, "sequence_number", nextSeq())
final, _ = sjson.SetBytes(final, "item.id", st.CurrentMsgID)
final, _ = sjson.SetBytes(final, "item.content.0.text", fullText)
out = append(out, emitEvent("response.output_item.done", final))
st.InTextBlock = false
} else if st.InFuncBlock {
args := "{}"
@@ -304,6 +359,7 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin
st.ReasoningActive = false
st.ReasoningPartAdded = false
}
return noSSEOutput(out)
case "message_delta":
if usage := root.Get("usage"); usage.Exists() {
if v := usage.Get("output_tokens"); v.Exists() {
@@ -315,7 +371,9 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin
st.UsageSeen = true
}
}
return [][]byte{}
case "message_stop":
out = append(out, st.finalizeAssistantMessage(nextSeq)...)
completed := []byte(`{"type":"response.completed","sequence_number":0,"response":{"id":"","object":"response","created_at":0,"status":"completed","background":false,"error":null}}`)
completed, _ = sjson.SetBytes(completed, "sequence_number", nextSeq())
@@ -407,6 +465,9 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin
item := []byte(`{"id":"","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":""}],"role":"assistant"}`)
item, _ = sjson.SetBytes(item, "id", st.CurrentMsgID)
item, _ = sjson.SetBytes(item, "content.0.text", st.TextBuf.String())
if len(st.MessageAnnotations) > 0 {
item, _ = sjson.SetBytes(item, "content.0.annotations", st.MessageAnnotations)
}
outputsWrapper, _ = sjson.SetRawBytes(outputsWrapper, "arr.-1", item)
}
// function_call items (in ascending index order for determinism)
@@ -466,7 +527,7 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin
out = append(out, emitEvent("response.completed", completed))
}
return out
return noSSEOutput(out)
}
// ConvertClaudeResponseToOpenAIResponsesNonStream aggregates Claude SSE into a single OpenAI Responses JSON.
@@ -506,6 +567,7 @@ func ConvertClaudeResponseToOpenAIResponsesNonStream(_ context.Context, _ string
reasoningActive bool
reasoningItemID string
reasoningSig string
annotations []any
inputTokens int64
outputTokens int64
)
@@ -592,6 +654,10 @@ func ConvertClaudeResponseToOpenAIResponsesNonStream(_ context.Context, _ string
reasoningSig = signature.String()
}
}
case "citations_delta":
if citation := d.Get("citation"); citation.Exists() {
annotations = append(annotations, citation.Value())
}
}
case "content_block_stop":
@@ -692,6 +758,9 @@ func ConvertClaudeResponseToOpenAIResponsesNonStream(_ context.Context, _ string
item := []byte(`{"id":"","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":""}],"role":"assistant"}`)
item, _ = sjson.SetBytes(item, "id", currentMsgID)
item, _ = sjson.SetBytes(item, "content.0.text", textBuf.String())
if len(annotations) > 0 {
item, _ = sjson.SetBytes(item, "content.0.annotations", annotations)
}
outputsWrapper, _ = sjson.SetRawBytes(outputsWrapper, "arr.-1", item)
}
if len(toolCalls) > 0 {

View File

@@ -5,6 +5,7 @@ import (
"strings"
"testing"
sdktranslator "github.com/router-for-me/CLIProxyAPI/v7/sdk/translator"
"github.com/tidwall/gjson"
)
@@ -29,6 +30,15 @@ func parseClaudeResponsesSSEEvent(t *testing.T, chunk []byte) (string, gjson.Res
return event, gjson.Parse(data)
}
func translateClaudeResponsesStreamThroughRegistry(chunks [][]byte) [][]byte {
var param any
var outputs [][]byte
for _, chunk := range chunks {
outputs = append(outputs, sdktranslator.TranslateStream(context.Background(), sdktranslator.FormatClaude, sdktranslator.FormatOpenAIResponse, "claude-test", nil, nil, chunk, &param)...)
}
return outputs
}
func TestConvertClaudeResponseToOpenAIResponses_ThinkingIncludesSignature(t *testing.T) {
signature := "claude_sig_123"
chunks := [][]byte{
@@ -78,6 +88,84 @@ func TestConvertClaudeResponseToOpenAIResponses_ThinkingIncludesSignature(t *tes
}
}
func TestConvertClaudeResponseToOpenAIResponses_SuppressesSignatureDeltaPassthrough(t *testing.T) {
chunk := []byte(`data: {"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"claude_sig_123"}}`)
outputs := translateClaudeResponsesStreamThroughRegistry([][]byte{chunk})
if len(outputs) != 0 {
t.Fatalf("expected signature_delta to be suppressed, got %d chunks", len(outputs))
}
}
func TestConvertClaudeResponseToOpenAIResponses_AggregatesTextBlocksUntilMessageStop(t *testing.T) {
chunks := [][]byte{
[]byte(`data: {"type":"message_start","message":{"id":"msg_123","usage":{"input_tokens":1,"output_tokens":0}}}`),
[]byte(`data: {"type":"content_block_start","index":4,"content_block":{"type":"text","text":""}}`),
[]byte(`data: {"type":"content_block_delta","index":4,"delta":{"type":"text_delta","text":"**对比竞品**\n- "}}`),
[]byte(`data: {"type":"content_block_stop","index":4}`),
[]byte(`data: {"type":"content_block_start","index":5,"content_block":{"type":"server_tool_use","id":"srv_123","name":"web_search","input":{}}}`),
[]byte(`data: {"type":"content_block_delta","index":5,"delta":{"type":"input_json_delta","partial_json":"{\"query\":\"Qwen3\"}"}}`),
[]byte(`data: {"type":"content_block_stop","index":5}`),
[]byte(`data: {"type":"content_block_start","index":6,"content_block":{"type":"web_search_tool_result","tool_use_id":"srv_123","content":[{"type":"web_search_result","title":"Example","url":"https://example.com"}]}}`),
[]byte(`data: {"type":"content_block_stop","index":6}`),
[]byte(`data: {"type":"content_block_delta","index":5,"delta":{"type":"citations_delta","citation":{"type":"web_search_result_location","cited_text":"Qwen 3.7 Max","url":"https://example.com","title":"Example"}}}`),
[]byte(`data: {"type":"content_block_start","index":7,"content_block":{"type":"text","text":""}}`),
[]byte(`data: {"type":"content_block_delta","index":7,"delta":{"type":"text_delta","text":"Qwen 3.7 Max leads."}}`),
[]byte(`data: {"type":"content_block_stop","index":7}`),
[]byte(`data: {"type":"message_delta","usage":{"output_tokens":12}}`),
[]byte(`data: {"type":"message_stop"}`),
}
outputs := translateClaudeResponsesStreamThroughRegistry(chunks)
counts := map[string]int{}
var outputTextDone gjson.Result
var completed gjson.Result
for _, output := range outputs {
event, data := parseClaudeResponsesSSEEvent(t, output)
counts[event]++
if event == "response.output_text.done" {
outputTextDone = data
}
if event == "response.completed" {
completed = data
}
if strings.HasPrefix(event, "content_block_") || event == "message_delta" {
t.Fatalf("unexpected anthropic-native event leaked: %s", event)
}
}
if counts["response.output_item.added"] != 1 {
t.Fatalf("response.output_item.added count = %d, want 1", counts["response.output_item.added"])
}
if counts["response.content_part.added"] != 1 {
t.Fatalf("response.content_part.added count = %d, want 1", counts["response.content_part.added"])
}
if counts["response.output_text.done"] != 1 {
t.Fatalf("response.output_text.done count = %d, want 1", counts["response.output_text.done"])
}
if counts["response.content_part.done"] != 1 {
t.Fatalf("response.content_part.done count = %d, want 1", counts["response.content_part.done"])
}
if counts["response.output_item.done"] != 1 {
t.Fatalf("response.output_item.done count = %d, want 1", counts["response.output_item.done"])
}
if counts["response.function_call_arguments.delta"] != 0 {
t.Fatalf("response.function_call_arguments.delta count = %d, want 0", counts["response.function_call_arguments.delta"])
}
wantText := "**对比竞品**\n- Qwen 3.7 Max leads."
if got := outputTextDone.Get("text").String(); got != wantText {
t.Fatalf("output_text.done text = %q, want %q", got, wantText)
}
if got := completed.Get("response.output.0.content.0.text").String(); got != wantText {
t.Fatalf("completed message text = %q, want %q", got, wantText)
}
if got := completed.Get("response.output.0.content.0.annotations.0.type").String(); got != "web_search_result_location" {
t.Fatalf("completed annotation type = %q", got)
}
}
func TestConvertClaudeResponseToOpenAIResponsesNonStream_ThinkingIncludesSignature(t *testing.T) {
signature := "claude_sig_nonstream"
raw := []byte(strings.Join([]string{