From 1c2153a2cb0673bdbe448789fb550cea8e81bf64 Mon Sep 17 00:00:00 2001 From: slicenfer <16222938+slicenfer@user.noreply.gitee.com> Date: Mon, 18 May 2026 10:13:12 +0800 Subject: [PATCH] fix(openai-claude): stabilize streaming tool_use blocks --- .../openai/claude/openai_claude_response.go | 101 ++++-- .../claude/openai_claude_response_test.go | 327 +++++++++++++++++- 2 files changed, 403 insertions(+), 25 deletions(-) diff --git a/internal/translator/openai/claude/openai_claude_response.go b/internal/translator/openai/claude/openai_claude_response.go index 1925539c1..47f3f3897 100644 --- a/internal/translator/openai/claude/openai_claude_response.go +++ b/internal/translator/openai/claude/openai_claude_response.go @@ -8,6 +8,7 @@ package claude import ( "bytes" "context" + "sort" "strings" translatorcommon "github.com/router-for-me/CLIProxyAPI/v7/internal/translator/common" @@ -26,6 +27,9 @@ type ConvertOpenAIResponseToAnthropicParams struct { Model string CreatedAt int64 ToolNameMap map[string]string + // SawToolCall is true once at least one tool_use content_block_start has + // been emitted on the wire. Using raw upstream tool_calls presence here + // can produce stop_reason=tool_use with zero announced tool blocks. SawToolCall bool // Content accumulator for streaming ContentAccumulator strings.Builder @@ -60,6 +64,9 @@ type ToolCallAccumulator struct { ID string Name string Arguments strings.Builder + // StartEmitted tracks whether content_block_start has already been sent + // for this tool index. + StartEmitted bool } // ConvertOpenAIResponseToClaude converts OpenAI streaming response format to Anthropic API format. @@ -218,9 +225,7 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI } toolCalls.ForEach(func(_, toolCall gjson.Result) bool { - param.SawToolCall = true index := int(toolCall.Get("index").Int()) - blockIndex := param.toolContentBlockIndex(index) // Initialize accumulator if needed if _, exists := param.ToolCallsAccumulator[index]; !exists { @@ -229,27 +234,25 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI accumulator := param.ToolCallsAccumulator[index] - // Handle tool call ID - if id := toolCall.Get("id"); id.Exists() { - accumulator.ID = id.String() + // Handle tool call ID. Only accept JSON-string, non-empty + // values so malformed upstream fields do not overwrite a + // valid ID or coerce into a content_block.id. + if id := toolCall.Get("id"); id.Exists() && id.Type == gjson.String { + if idStr := id.String(); idStr != "" { + accumulator.ID = idStr + } } - // Handle function name + // Handle function name and arguments if function := toolCall.Get("function"); function.Exists() { - if name := function.Get("name"); name.Exists() && name.String() != "" { - accumulator.Name = util.MapToolName(param.ToolNameMap, name.String()) - - stopThinkingContentBlock(param, &results) - - stopTextContentBlock(param, &results) - - // Send content_block_start for tool_use - contentBlockStartJSON := `{"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"","name":"","input":{}}}` - contentBlockStartJSONBytes := []byte(contentBlockStartJSON) - contentBlockStartJSONBytes, _ = sjson.SetBytes(contentBlockStartJSONBytes, "index", blockIndex) - contentBlockStartJSONBytes, _ = sjson.SetBytes(contentBlockStartJSONBytes, "content_block.id", util.SanitizeClaudeToolID(accumulator.ID)) - contentBlockStartJSONBytes, _ = sjson.SetBytes(contentBlockStartJSONBytes, "content_block.name", accumulator.Name) - results = append(results, translatorcommon.AppendSSEEventBytes(nil, "content_block_start", contentBlockStartJSONBytes, 2)) + // Only record the name until content_block_start has been + // emitted. Some upstreams send "name": "" or repeat the + // field across chunks; reassigning after start could drift + // from what was already announced. + if !accumulator.StartEmitted { + if name := function.Get("name"); name.Exists() && name.Type == gjson.String && name.String() != "" { + accumulator.Name = util.MapToolName(param.ToolNameMap, name.String()) + } } // Handle function arguments @@ -261,6 +264,13 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI } } + // Re-check on every chunk, not only chunks with a function + // object. Some upstreams split function.name and id across + // separate deltas. + if !accumulator.StartEmitted && accumulator.Name != "" && accumulator.ID != "" && !param.ContentBlocksStopped { + emitToolUseStart(param, index, accumulator, &results) + } + return true }) } @@ -269,9 +279,12 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI // Handle finish_reason (but don't send message_delta/message_stop yet) if finishReason := root.Get("choices.0.finish_reason"); finishReason.Exists() && finishReason.String() != "" { reason := finishReason.String() - if param.SawToolCall { + switch { + case param.SawToolCall: param.FinishReason = "tool_calls" - } else { + case reason == "tool_calls": + param.FinishReason = "stop" + default: param.FinishReason = reason } @@ -289,8 +302,17 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI // Send content_block_stop for any tool calls if !param.ContentBlocksStopped { - for index := range param.ToolCallsAccumulator { + for _, index := range toolCallAccumulatorIndexes(param.ToolCallsAccumulator) { accumulator := param.ToolCallsAccumulator[index] + if !accumulator.StartEmitted { + // Belated emit for streams that supplied a valid name but + // never sent an id. SanitizeClaudeToolID("") produces the + // expected stable synthetic toolu__ ID shape. + if accumulator.Name == "" { + continue + } + emitToolUseStart(param, index, accumulator, &results) + } blockIndex := param.toolContentBlockIndex(index) // Send complete input_json_delta with all accumulated arguments @@ -353,8 +375,16 @@ func convertOpenAIDoneToAnthropic(param *ConvertOpenAIResponseToAnthropicParams) stopTextContentBlock(param, &results) if !param.ContentBlocksStopped { - for index := range param.ToolCallsAccumulator { + for _, index := range toolCallAccumulatorIndexes(param.ToolCallsAccumulator) { accumulator := param.ToolCallsAccumulator[index] + if !accumulator.StartEmitted { + // Belated emit at [DONE]; same behavior as the finish_reason + // path for name-but-no-id streams. + if accumulator.Name == "" { + continue + } + emitToolUseStart(param, index, accumulator, &results) + } blockIndex := param.toolContentBlockIndex(index) if accumulator.Arguments.Len() > 0 { @@ -547,6 +577,29 @@ func stopTextContentBlock(param *ConvertOpenAIResponseToAnthropicParams, results param.TextContentBlockIndex = -1 } +func emitToolUseStart(param *ConvertOpenAIResponseToAnthropicParams, openAIToolIndex int, accumulator *ToolCallAccumulator, results *[][]byte) { + stopThinkingContentBlock(param, results) + stopTextContentBlock(param, results) + + blockIndex := param.toolContentBlockIndex(openAIToolIndex) + contentBlockStartJSON := []byte(`{"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"","name":"","input":{}}}`) + contentBlockStartJSON, _ = sjson.SetBytes(contentBlockStartJSON, "index", blockIndex) + contentBlockStartJSON, _ = sjson.SetBytes(contentBlockStartJSON, "content_block.id", util.SanitizeClaudeToolID(accumulator.ID)) + contentBlockStartJSON, _ = sjson.SetBytes(contentBlockStartJSON, "content_block.name", accumulator.Name) + *results = append(*results, translatorcommon.AppendSSEEventBytes(nil, "content_block_start", contentBlockStartJSON, 2)) + accumulator.StartEmitted = true + param.SawToolCall = true +} + +func toolCallAccumulatorIndexes(accumulators map[int]*ToolCallAccumulator) []int { + indexes := make([]int, 0, len(accumulators)) + for index := range accumulators { + indexes = append(indexes, index) + } + sort.Ints(indexes) + return indexes +} + // ConvertOpenAIResponseToClaudeNonStream converts a non-streaming OpenAI response to a non-streaming Anthropic response. // // Parameters: diff --git a/internal/translator/openai/claude/openai_claude_response_test.go b/internal/translator/openai/claude/openai_claude_response_test.go index 8c36fc3d8..35aa36f36 100644 --- a/internal/translator/openai/claude/openai_claude_response_test.go +++ b/internal/translator/openai/claude/openai_claude_response_test.go @@ -3,11 +3,108 @@ package claude import ( "bytes" "context" + "strings" "testing" + + "github.com/tidwall/gjson" ) +type sseEvent struct { + Type string + Payload string +} + +func runStream(t *testing.T, originalReq string, chunks ...string) []sseEvent { + t.Helper() + + var paramAny any + var emitted [][]byte + for _, chunk := range chunks { + emitted = append(emitted, ConvertOpenAIResponseToClaude( + context.Background(), + "", + []byte(originalReq), + nil, + []byte("data: "+chunk), + ¶mAny, + )...) + } + emitted = append(emitted, ConvertOpenAIResponseToClaude( + context.Background(), + "", + []byte(originalReq), + nil, + []byte("data: [DONE]"), + ¶mAny, + )...) + + var events []sseEvent + for _, raw := range emitted { + s := string(raw) + if !strings.HasPrefix(s, "event: ") { + continue + } + nl := strings.Index(s, "\n") + if nl < 0 { + continue + } + typ := strings.TrimPrefix(s[:nl], "event: ") + rest := s[nl+1:] + if !strings.HasPrefix(rest, "data: ") { + continue + } + payload := strings.TrimRight(strings.TrimPrefix(rest, "data: "), "\n") + events = append(events, sseEvent{Type: typ, Payload: payload}) + } + return events +} + +func countByType(events []sseEvent, typ string) int { + n := 0 + for _, e := range events { + if e.Type == typ { + n++ + } + } + return n +} + +func toolUseStarts(events []sseEvent) []sseEvent { + var out []sseEvent + for _, e := range events { + if e.Type != "content_block_start" { + continue + } + if gjson.Get(e.Payload, "content_block.type").String() == "tool_use" { + out = append(out, e) + } + } + return out +} + +func blockIndices(events []sseEvent) []int64 { + var idx []int64 + for _, e := range events { + if e.Type == "content_block_start" { + idx = append(idx, gjson.Get(e.Payload, "index").Int()) + } + } + return idx +} + +func lastStopReason(events []sseEvent) string { + for i := len(events) - 1; i >= 0; i-- { + if events[i].Type == "message_delta" { + return gjson.Get(events[i].Payload, "delta.stop_reason").String() + } + } + return "" +} + +const streamReq = `{"stream":true}` + func TestConvertOpenAIResponseToClaude_StreamIgnoresNullToolNameDelta(t *testing.T) { - originalRequest := []byte(`{"stream":true}`) + originalRequest := []byte(streamReq) var param any firstChunks := ConvertOpenAIResponseToClaude( @@ -39,3 +136,231 @@ func TestConvertOpenAIResponseToClaude_StreamIgnoresNullToolNameDelta(t *testing t.Fatalf("did not expect null tool name delta to emit an empty tool name, got %s", string(secondOutput)) } } + +func TestStreamingTool_EmptyNameThroughout(t *testing.T) { + events := runStream(t, streamReq, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"role":"assistant","tool_calls":[{"index":0,"id":"call_a","function":{"name":"","arguments":""}}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"name":"","arguments":"{\"x\":1}"}}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`, + ) + + if got := len(toolUseStarts(events)); got != 0 { + t.Fatalf("expected zero tool_use content_block_start, got %d (events=%+v)", got, events) + } + if got := countByType(events, "content_block_delta"); got != 0 { + t.Fatalf("expected zero content_block_delta when start was suppressed, got %d", got) + } + if got := countByType(events, "content_block_stop"); got != 0 { + t.Fatalf("expected zero content_block_stop when start was suppressed, got %d", got) + } + if got := lastStopReason(events); got == "tool_use" { + t.Fatalf("stop_reason must not be tool_use when zero tool_use blocks were emitted; got %q", got) + } +} + +func TestStreamingTool_NullName(t *testing.T) { + events := runStream(t, streamReq, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"role":"assistant","tool_calls":[{"index":0,"id":"call_a","function":{"name":null,"arguments":""}}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`, + ) + if got := len(toolUseStarts(events)); got != 0 { + t.Fatalf("null name must not produce a tool_use start; got %d", got) + } + if got := countByType(events, "content_block_stop"); got != 0 { + t.Fatalf("null name must not produce content_block_stop; got %d", got) + } +} + +func TestStreamingTool_NonStringName(t *testing.T) { + events := runStream(t, streamReq, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"role":"assistant","tool_calls":[{"index":0,"id":"call_a","function":{"name":123,"arguments":""}}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`, + ) + if got := len(toolUseStarts(events)); got != 0 { + t.Fatalf("non-string name must not produce a tool_use start; got %d", got) + } +} + +func TestStreamingTool_RepeatedName(t *testing.T) { + events := runStream(t, streamReq, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"role":"assistant","tool_calls":[{"index":0,"id":"call_a","function":{"name":"do_it","arguments":""}}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"name":"do_it","arguments":"{\"x\""}}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"name":"do_it","arguments":":1}"}}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`, + ) + + starts := toolUseStarts(events) + if len(starts) != 1 { + t.Fatalf("expected exactly one tool_use start, got %d", len(starts)) + } + if name := gjson.Get(starts[0].Payload, "content_block.name").String(); name != "do_it" { + t.Fatalf("announced tool name = %q, want %q", name, "do_it") + } + if got := countByType(events, "content_block_stop"); got != 1 { + t.Fatalf("expected exactly one content_block_stop, got %d", got) + } +} + +func TestStreamingTool_MixedSuppressedAndValid(t *testing.T) { + events := runStream(t, streamReq, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"role":"assistant","tool_calls":[ + {"index":0,"id":"call_skip","function":{"name":"","arguments":""}}, + {"index":1,"id":"call_real","function":{"name":"do_it","arguments":""}} + ]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"tool_calls":[ + {"index":1,"function":{"arguments":"{}"}} + ]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`, + ) + + starts := toolUseStarts(events) + if len(starts) != 1 { + t.Fatalf("expected exactly one tool_use start, got %d", len(starts)) + } + if got := countByType(events, "content_block_stop"); got != 1 { + t.Fatalf("expected exactly one content_block_stop, got %d", got) + } + + indices := blockIndices(events) + if len(indices) == 0 || indices[0] != 0 { + t.Fatalf("first content_block_start index must be 0, got %v", indices) + } +} + +func TestStreamingTool_EmptyIDDeferStart(t *testing.T) { + events := runStream(t, streamReq, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"role":"assistant","tool_calls":[{"index":0,"id":"","function":{"name":"do_it","arguments":""}}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"id":"call_real","function":{"arguments":"{}"}}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`, + ) + + starts := toolUseStarts(events) + if len(starts) != 1 { + t.Fatalf("expected exactly one tool_use start once id arrived, got %d", len(starts)) + } + if id := gjson.Get(starts[0].Payload, "content_block.id").String(); id != "call_real" { + t.Fatalf("announced tool id = %q, want %q", id, "call_real") + } +} + +func TestStreamingTool_IDInDeltaWithoutFunction(t *testing.T) { + events := runStream(t, streamReq, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"role":"assistant","tool_calls":[{"index":0,"function":{"name":"do_it"}}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"id":"call_real"}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{}"}}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`, + ) + + starts := toolUseStarts(events) + if len(starts) != 1 { + t.Fatalf("expected exactly one tool_use start when id arrives in a function-less delta, got %d", len(starts)) + } + if id := gjson.Get(starts[0].Payload, "content_block.id").String(); id != "call_real" { + t.Fatalf("announced tool id = %q, want %q", id, "call_real") + } + if name := gjson.Get(starts[0].Payload, "content_block.name").String(); name != "do_it" { + t.Fatalf("announced tool name = %q, want %q", name, "do_it") + } + if got := countByType(events, "content_block_stop"); got != 1 { + t.Fatalf("expected exactly one content_block_stop, got %d", got) + } +} + +func TestStreamingTool_StopReasonWithEmittedTool(t *testing.T) { + events := runStream(t, streamReq, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"role":"assistant","tool_calls":[{"index":0,"id":"call_a","function":{"name":"do_it","arguments":"{}"}}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}],"usage":{"prompt_tokens":1,"completion_tokens":1}}`, + ) + if got := lastStopReason(events); got != "tool_use" { + t.Fatalf("stop_reason = %q, want %q", got, "tool_use") + } +} + +func TestStreamingTool_StopReasonWhenIDNeverArrives(t *testing.T) { + events := runStream(t, streamReq, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"role":"assistant","tool_calls":[{"index":0,"function":{"name":"do_it","arguments":""}}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{}"}}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`, + ) + + starts := toolUseStarts(events) + if len(starts) != 1 { + t.Fatalf("expected one belated tool_use start with synthetic id, got %d", len(starts)) + } + id := gjson.Get(starts[0].Payload, "content_block.id").String() + if !strings.HasPrefix(id, "toolu_") { + t.Fatalf("synthetic id should match toolu__, got %q", id) + } + if name := gjson.Get(starts[0].Payload, "content_block.name").String(); name != "do_it" { + t.Fatalf("announced tool name = %q, want %q", name, "do_it") + } + if got := lastStopReason(events); got != "tool_use" { + t.Fatalf("stop_reason = %q, want %q", got, "tool_use") + } +} + +func TestStreamingTool_BelatedStartsUseOpenAIToolIndexOrder(t *testing.T) { + events := runStream(t, streamReq, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"role":"assistant","tool_calls":[ + {"index":2,"function":{"name":"third_tool","arguments":"{}"}}, + {"index":0,"function":{"name":"first_tool","arguments":"{}"}}, + {"index":1,"function":{"name":"second_tool","arguments":"{}"}} + ]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`, + ) + + starts := toolUseStarts(events) + if len(starts) != 3 { + t.Fatalf("expected three belated tool_use starts, got %d", len(starts)) + } + + wantNames := []string{"first_tool", "second_tool", "third_tool"} + for i, wantName := range wantNames { + if name := gjson.Get(starts[i].Payload, "content_block.name").String(); name != wantName { + t.Fatalf("tool_use start %d name = %q, want %q (starts=%+v)", i, name, wantName, starts) + } + if blockIndex := gjson.Get(starts[i].Payload, "index").Int(); blockIndex != int64(i) { + t.Fatalf("tool_use start %d block index = %d, want %d", i, blockIndex, i) + } + } +} + +func TestStreamingTool_LateIDAfterFinalization(t *testing.T) { + events := runStream(t, streamReq, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"role":"assistant","tool_calls":[{"index":0,"function":{"name":"do_it"}}]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}],"usage":{"prompt_tokens":1,"completion_tokens":1}}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"id":"call_late"}]}}]}`, + ) + + starts := toolUseStarts(events) + if len(starts) != 1 { + t.Fatalf("expected one belated tool_use start, got %d", len(starts)) + } + + var sawMessageStop bool + for _, e := range events { + if e.Type == "message_stop" { + sawMessageStop = true + continue + } + if sawMessageStop { + switch e.Type { + case "content_block_start", "content_block_delta", "content_block_stop": + t.Fatalf("event %q emitted after message_stop (events=%+v)", e.Type, events) + } + } + } +} + +func TestStreamingTool_StopReasonMixedSuppressedAndValid(t *testing.T) { + events := runStream(t, streamReq, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{"role":"assistant","tool_calls":[ + {"index":0,"id":"call_skip","function":{"name":"","arguments":""}}, + {"index":1,"id":"call_real","function":{"name":"do_it","arguments":"{}"}} + ]}}]}`, + `{"id":"c1","model":"m","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`, + ) + if got := lastStopReason(events); got != "tool_use" { + t.Fatalf("stop_reason = %q, want %q", got, "tool_use") + } +}