fix(websocket): skip stale state merge after client-side compact

After a Codex CLI compact, the client sends a full conversation
transcript (with compaction items or assistant messages) as input.
Previously, normalizeResponseSubsequentRequest() unconditionally
merged this with stale lastRequest/lastResponseOutput, breaking
function_call/function_call_output pairings and causing 400 errors
("No tool output found for function call").

Add inputContainsFullTranscript() heuristic that detects compaction
items (type=compaction/compaction_summary) or assistant messages in
the input array, and bypasses the merge when a full transcript is
present.

Fixes #2207
This commit is contained in:
DragonFSKY
2026-03-22 09:49:34 +08:00
parent cad45ffa33
commit a0fe273081
2 changed files with 155 additions and 12 deletions

View File

@@ -315,20 +315,32 @@ func normalizeResponseSubsequentRequest(rawJSON []byte, lastRequest []byte, last
}
}
existingInput := gjson.GetBytes(lastRequest, "input")
mergedInput, errMerge := mergeJSONArrayRaw(existingInput.Raw, normalizeJSONArrayRaw(lastResponseOutput))
if errMerge != nil {
return nil, lastRequest, &interfaces.ErrorMessage{
StatusCode: http.StatusBadRequest,
Error: fmt.Errorf("invalid previous response output: %w", errMerge),
// When the client sends a full conversation transcript (e.g. after compact),
// the input already contains the complete history including assistant messages.
// In that case, skip merging with stale lastRequest/lastResponseOutput to avoid
// breaking function_call / function_call_output pairings.
// See: https://github.com/router-for-me/CLIProxyAPI/issues/2207
var mergedInput string
if inputContainsFullTranscript(nextInput) {
log.Infof("responses websocket: full transcript detected, skipping stale merge (input items=%d)", len(nextInput.Array()))
mergedInput = nextInput.Raw
} else {
existingInput := gjson.GetBytes(lastRequest, "input")
var errMerge error
mergedInput, errMerge = mergeJSONArrayRaw(existingInput.Raw, normalizeJSONArrayRaw(lastResponseOutput))
if errMerge != nil {
return nil, lastRequest, &interfaces.ErrorMessage{
StatusCode: http.StatusBadRequest,
Error: fmt.Errorf("invalid previous response output: %w", errMerge),
}
}
}
mergedInput, errMerge = mergeJSONArrayRaw(mergedInput, nextInput.Raw)
if errMerge != nil {
return nil, lastRequest, &interfaces.ErrorMessage{
StatusCode: http.StatusBadRequest,
Error: fmt.Errorf("invalid request input: %w", errMerge),
mergedInput, errMerge = mergeJSONArrayRaw(mergedInput, nextInput.Raw)
if errMerge != nil {
return nil, lastRequest, &interfaces.ErrorMessage{
StatusCode: http.StatusBadRequest,
Error: fmt.Errorf("invalid request input: %w", errMerge),
}
}
}
dedupedInput, errDedupeFunctionCalls := dedupeFunctionCallsByCallID(mergedInput)
@@ -691,6 +703,36 @@ func mergeJSONArrayRaw(existingRaw, appendRaw string) (string, error) {
return string(out), nil
}
// inputContainsFullTranscript returns true when the input array looks like a
// complete conversation history rather than an incremental append. After a
// client-side compact the input already carries the full (compacted) transcript
// which may include assistant messages or compaction items. Merging that with
// the stale lastRequest / lastResponseOutput would duplicate or break
// function_call / function_call_output pairings, so the caller should use the
// input as-is.
//
// Heuristic: the array is a full transcript when it contains either
// - a message with role="assistant", or
// - a compaction item (type="compaction" or "compaction_summary").
//
// Normal incremental turns only contain user messages or function_call_output
// items and never carry either of these signals.
func inputContainsFullTranscript(input gjson.Result) bool {
if !input.IsArray() {
return false
}
for _, item := range input.Array() {
t := item.Get("type").String()
if t == "message" && item.Get("role").String() == "assistant" {
return true
}
if t == "compaction" || t == "compaction_summary" {
return true
}
}
return false
}
func normalizeJSONArrayRaw(raw []byte) string {
trimmed := strings.TrimSpace(string(raw))
if trimmed == "" {

View File

@@ -1400,3 +1400,104 @@ func TestResponsesWebsocketCompactionResetsTurnStateOnTranscriptReplacement(t *t
t.Fatalf("post-compact function call id = %s, want call-1", items[0].Get("call_id").String())
}
}
func TestInputContainsFullTranscriptDetectsAssistantMessage(t *testing.T) {
input := gjson.Parse(`[
{"type":"message","role":"user","content":"hello"},
{"type":"message","role":"assistant","content":"hi there"}
]`)
if !inputContainsFullTranscript(input) {
t.Fatal("expected full transcript when assistant message is present")
}
}
func TestInputContainsFullTranscriptDetectsCompactionItem(t *testing.T) {
for _, typ := range []string{"compaction", "compaction_summary"} {
input := gjson.Parse(`[{"type":"message","role":"user","content":"hello"},{"type":"` + typ + `","encrypted_content":"summary"}]`)
if !inputContainsFullTranscript(input) {
t.Fatalf("expected full transcript for type=%s", typ)
}
}
}
func TestInputContainsFullTranscriptFalseForIncremental(t *testing.T) {
// Normal incremental turns: user messages or function_call_output only.
for _, raw := range []string{
`[{"type":"function_call_output","call_id":"call-1","output":"result"}]`,
`[{"type":"message","role":"user","content":"next question"}]`,
`[]`,
} {
if inputContainsFullTranscript(gjson.Parse(raw)) {
t.Fatalf("incremental input must not be detected as full transcript: %s", raw)
}
}
}
func TestNormalizeSubsequentRequestCompactSkipsMerge(t *testing.T) {
lastRequest := []byte(`{"model":"gpt-5.4","stream":true,"input":[
{"type":"message","role":"user","id":"msg-1","content":"original long prompt"},
{"type":"message","role":"assistant","id":"msg-2","content":"original long response"},
{"type":"function_call","id":"fc-1","call_id":"call-old","name":"bash","arguments":"{}"},
{"type":"function_call_output","id":"fco-1","call_id":"call-old","output":"old result"}
]}`)
lastResponseOutput := []byte(`[
{"type":"message","role":"assistant","id":"msg-3","content":"another assistant reply"},
{"type":"function_call","id":"fc-2","call_id":"call-stale","name":"read","arguments":"{}"}
]`)
// Remote compact response: user messages + compaction item, NO assistant message.
// This is the primary compact scenario from Codex CLI.
raw := []byte(`{"type":"response.create","input":[
{"type":"message","role":"user","id":"msg-1c","content":"compacted user msg"},
{"type":"compaction","encrypted_content":"conversation summary"}
]}`)
normalized, _, errMsg := normalizeResponsesWebsocketRequest(raw, lastRequest, lastResponseOutput)
if errMsg != nil {
t.Fatalf("unexpected error: %v", errMsg.Error)
}
input := gjson.GetBytes(normalized, "input").Array()
if len(input) != 2 {
t.Fatalf("input len = %d, want 2 (compacted only); stale state was not skipped", len(input))
}
if input[0].Get("id").String() != "msg-1c" {
t.Fatalf("input[0].id = %q, want %q", input[0].Get("id").String(), "msg-1c")
}
if input[1].Get("type").String() != "compaction" {
t.Fatalf("input[1].type = %q, want %q", input[1].Get("type").String(), "compaction")
}
}
func TestNormalizeSubsequentRequestIncrementalInputStillMerges(t *testing.T) {
// Normal incremental flow: user sends function_call_output (no assistant message).
lastRequest := []byte(`{"model":"gpt-5.4","stream":true,"input":[
{"type":"message","role":"user","id":"msg-1","content":"hello"}
]}`)
lastResponseOutput := []byte(`[
{"type":"message","role":"assistant","id":"msg-2","content":"let me check"},
{"type":"function_call","id":"fc-1","call_id":"call-1","name":"bash","arguments":"{}"}
]`)
raw := []byte(`{"type":"response.create","input":[
{"type":"function_call_output","call_id":"call-1","id":"fco-1","output":"done"}
]}`)
normalized, _, errMsg := normalizeResponsesWebsocketRequest(raw, lastRequest, lastResponseOutput)
if errMsg != nil {
t.Fatalf("unexpected error: %v", errMsg.Error)
}
input := gjson.GetBytes(normalized, "input").Array()
// Should be merged: msg-1 + msg-2 + fc-1 + fco-1 = 4 items
if len(input) != 4 {
t.Fatalf("input len = %d, want 4 (merged)", len(input))
}
wantIDs := []string{"msg-1", "msg-2", "fc-1", "fco-1"}
for i, want := range wantIDs {
got := input[i].Get("id").String()
if got != want {
t.Fatalf("input[%d].id = %q, want %q", i, got, want)
}
}
}