From a0fe273081eaa3a4b89ec21fd718a4585b84fda2 Mon Sep 17 00:00:00 2001 From: DragonFSKY Date: Sun, 22 Mar 2026 09:49:34 +0800 Subject: [PATCH] fix(websocket): skip stale state merge after client-side compact After a Codex CLI compact, the client sends a full conversation transcript (with compaction items or assistant messages) as input. Previously, normalizeResponseSubsequentRequest() unconditionally merged this with stale lastRequest/lastResponseOutput, breaking function_call/function_call_output pairings and causing 400 errors ("No tool output found for function call"). Add inputContainsFullTranscript() heuristic that detects compaction items (type=compaction/compaction_summary) or assistant messages in the input array, and bypasses the merge when a full transcript is present. Fixes #2207 --- .../openai/openai_responses_websocket.go | 66 +++++++++--- .../openai/openai_responses_websocket_test.go | 101 ++++++++++++++++++ 2 files changed, 155 insertions(+), 12 deletions(-) diff --git a/sdk/api/handlers/openai/openai_responses_websocket.go b/sdk/api/handlers/openai/openai_responses_websocket.go index 2f6b14a77..6457cd3ee 100644 --- a/sdk/api/handlers/openai/openai_responses_websocket.go +++ b/sdk/api/handlers/openai/openai_responses_websocket.go @@ -315,20 +315,32 @@ func normalizeResponseSubsequentRequest(rawJSON []byte, lastRequest []byte, last } } - existingInput := gjson.GetBytes(lastRequest, "input") - mergedInput, errMerge := mergeJSONArrayRaw(existingInput.Raw, normalizeJSONArrayRaw(lastResponseOutput)) - if errMerge != nil { - return nil, lastRequest, &interfaces.ErrorMessage{ - StatusCode: http.StatusBadRequest, - Error: fmt.Errorf("invalid previous response output: %w", errMerge), + // When the client sends a full conversation transcript (e.g. after compact), + // the input already contains the complete history including assistant messages. + // In that case, skip merging with stale lastRequest/lastResponseOutput to avoid + // breaking function_call / function_call_output pairings. + // See: https://github.com/router-for-me/CLIProxyAPI/issues/2207 + var mergedInput string + if inputContainsFullTranscript(nextInput) { + log.Infof("responses websocket: full transcript detected, skipping stale merge (input items=%d)", len(nextInput.Array())) + mergedInput = nextInput.Raw + } else { + existingInput := gjson.GetBytes(lastRequest, "input") + var errMerge error + mergedInput, errMerge = mergeJSONArrayRaw(existingInput.Raw, normalizeJSONArrayRaw(lastResponseOutput)) + if errMerge != nil { + return nil, lastRequest, &interfaces.ErrorMessage{ + StatusCode: http.StatusBadRequest, + Error: fmt.Errorf("invalid previous response output: %w", errMerge), + } } - } - mergedInput, errMerge = mergeJSONArrayRaw(mergedInput, nextInput.Raw) - if errMerge != nil { - return nil, lastRequest, &interfaces.ErrorMessage{ - StatusCode: http.StatusBadRequest, - Error: fmt.Errorf("invalid request input: %w", errMerge), + mergedInput, errMerge = mergeJSONArrayRaw(mergedInput, nextInput.Raw) + if errMerge != nil { + return nil, lastRequest, &interfaces.ErrorMessage{ + StatusCode: http.StatusBadRequest, + Error: fmt.Errorf("invalid request input: %w", errMerge), + } } } dedupedInput, errDedupeFunctionCalls := dedupeFunctionCallsByCallID(mergedInput) @@ -691,6 +703,36 @@ func mergeJSONArrayRaw(existingRaw, appendRaw string) (string, error) { return string(out), nil } +// inputContainsFullTranscript returns true when the input array looks like a +// complete conversation history rather than an incremental append. After a +// client-side compact the input already carries the full (compacted) transcript +// which may include assistant messages or compaction items. Merging that with +// the stale lastRequest / lastResponseOutput would duplicate or break +// function_call / function_call_output pairings, so the caller should use the +// input as-is. +// +// Heuristic: the array is a full transcript when it contains either +// - a message with role="assistant", or +// - a compaction item (type="compaction" or "compaction_summary"). +// +// Normal incremental turns only contain user messages or function_call_output +// items and never carry either of these signals. +func inputContainsFullTranscript(input gjson.Result) bool { + if !input.IsArray() { + return false + } + for _, item := range input.Array() { + t := item.Get("type").String() + if t == "message" && item.Get("role").String() == "assistant" { + return true + } + if t == "compaction" || t == "compaction_summary" { + return true + } + } + return false +} + func normalizeJSONArrayRaw(raw []byte) string { trimmed := strings.TrimSpace(string(raw)) if trimmed == "" { diff --git a/sdk/api/handlers/openai/openai_responses_websocket_test.go b/sdk/api/handlers/openai/openai_responses_websocket_test.go index ecfc90b31..2c5ef579b 100644 --- a/sdk/api/handlers/openai/openai_responses_websocket_test.go +++ b/sdk/api/handlers/openai/openai_responses_websocket_test.go @@ -1400,3 +1400,104 @@ func TestResponsesWebsocketCompactionResetsTurnStateOnTranscriptReplacement(t *t t.Fatalf("post-compact function call id = %s, want call-1", items[0].Get("call_id").String()) } } + +func TestInputContainsFullTranscriptDetectsAssistantMessage(t *testing.T) { + input := gjson.Parse(`[ + {"type":"message","role":"user","content":"hello"}, + {"type":"message","role":"assistant","content":"hi there"} + ]`) + if !inputContainsFullTranscript(input) { + t.Fatal("expected full transcript when assistant message is present") + } +} + +func TestInputContainsFullTranscriptDetectsCompactionItem(t *testing.T) { + for _, typ := range []string{"compaction", "compaction_summary"} { + input := gjson.Parse(`[{"type":"message","role":"user","content":"hello"},{"type":"` + typ + `","encrypted_content":"summary"}]`) + if !inputContainsFullTranscript(input) { + t.Fatalf("expected full transcript for type=%s", typ) + } + } +} + +func TestInputContainsFullTranscriptFalseForIncremental(t *testing.T) { + // Normal incremental turns: user messages or function_call_output only. + for _, raw := range []string{ + `[{"type":"function_call_output","call_id":"call-1","output":"result"}]`, + `[{"type":"message","role":"user","content":"next question"}]`, + `[]`, + } { + if inputContainsFullTranscript(gjson.Parse(raw)) { + t.Fatalf("incremental input must not be detected as full transcript: %s", raw) + } + } +} + +func TestNormalizeSubsequentRequestCompactSkipsMerge(t *testing.T) { + lastRequest := []byte(`{"model":"gpt-5.4","stream":true,"input":[ + {"type":"message","role":"user","id":"msg-1","content":"original long prompt"}, + {"type":"message","role":"assistant","id":"msg-2","content":"original long response"}, + {"type":"function_call","id":"fc-1","call_id":"call-old","name":"bash","arguments":"{}"}, + {"type":"function_call_output","id":"fco-1","call_id":"call-old","output":"old result"} + ]}`) + lastResponseOutput := []byte(`[ + {"type":"message","role":"assistant","id":"msg-3","content":"another assistant reply"}, + {"type":"function_call","id":"fc-2","call_id":"call-stale","name":"read","arguments":"{}"} + ]`) + + // Remote compact response: user messages + compaction item, NO assistant message. + // This is the primary compact scenario from Codex CLI. + raw := []byte(`{"type":"response.create","input":[ + {"type":"message","role":"user","id":"msg-1c","content":"compacted user msg"}, + {"type":"compaction","encrypted_content":"conversation summary"} + ]}`) + + normalized, _, errMsg := normalizeResponsesWebsocketRequest(raw, lastRequest, lastResponseOutput) + if errMsg != nil { + t.Fatalf("unexpected error: %v", errMsg.Error) + } + + input := gjson.GetBytes(normalized, "input").Array() + if len(input) != 2 { + t.Fatalf("input len = %d, want 2 (compacted only); stale state was not skipped", len(input)) + } + if input[0].Get("id").String() != "msg-1c" { + t.Fatalf("input[0].id = %q, want %q", input[0].Get("id").String(), "msg-1c") + } + if input[1].Get("type").String() != "compaction" { + t.Fatalf("input[1].type = %q, want %q", input[1].Get("type").String(), "compaction") + } +} + +func TestNormalizeSubsequentRequestIncrementalInputStillMerges(t *testing.T) { + // Normal incremental flow: user sends function_call_output (no assistant message). + lastRequest := []byte(`{"model":"gpt-5.4","stream":true,"input":[ + {"type":"message","role":"user","id":"msg-1","content":"hello"} + ]}`) + lastResponseOutput := []byte(`[ + {"type":"message","role":"assistant","id":"msg-2","content":"let me check"}, + {"type":"function_call","id":"fc-1","call_id":"call-1","name":"bash","arguments":"{}"} + ]`) + raw := []byte(`{"type":"response.create","input":[ + {"type":"function_call_output","call_id":"call-1","id":"fco-1","output":"done"} + ]}`) + + normalized, _, errMsg := normalizeResponsesWebsocketRequest(raw, lastRequest, lastResponseOutput) + if errMsg != nil { + t.Fatalf("unexpected error: %v", errMsg.Error) + } + + input := gjson.GetBytes(normalized, "input").Array() + + // Should be merged: msg-1 + msg-2 + fc-1 + fco-1 = 4 items + if len(input) != 4 { + t.Fatalf("input len = %d, want 4 (merged)", len(input)) + } + wantIDs := []string{"msg-1", "msg-2", "fc-1", "fco-1"} + for i, want := range wantIDs { + got := input[i].Get("id").String() + if got != want { + t.Fatalf("input[%d].id = %q, want %q", i, got, want) + } + } +}