From ad868308c0a499185b3c4341e4a5fc6f91661467 Mon Sep 17 00:00:00 2001 From: sususu98 Date: Tue, 19 May 2026 11:56:28 +0800 Subject: [PATCH] fix codex context length stream errors --- internal/runtime/executor/codex_executor.go | 111 +++++++++++++ .../codex_executor_stream_output_test.go | 123 ++++++++++++++ sdk/api/handlers/claude/code_handlers.go | 154 +++++++++++++++++- .../claude/code_handlers_error_test.go | 94 +++++++++++ 4 files changed, 480 insertions(+), 2 deletions(-) create mode 100644 sdk/api/handlers/claude/code_handlers_error_test.go diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index 9d98df546..3db2100f9 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -100,6 +100,103 @@ func patchCodexCompletedOutput(eventData []byte, outputItemsByIndex map[int64][] return completedDataPatched } +func codexTerminalStreamContextLengthErr(eventData []byte) (statusErr, bool) { + eventType := gjson.GetBytes(eventData, "type").String() + var body []byte + switch eventType { + case "error": + body = codexTerminalErrorBody(eventData, "error") + if len(body) == 0 { + body = codexTerminalTopLevelErrorBody(eventData) + } + case "response.failed": + body = codexTerminalErrorBody(eventData, "response.error") + if len(body) == 0 { + body = codexTerminalErrorBody(eventData, "error") + } + default: + return statusErr{}, false + } + if len(body) == 0 { + return statusErr{}, false + } + if !codexTerminalErrorIsContextLength(body) { + return statusErr{}, false + } + return newCodexStatusErr(http.StatusBadRequest, body), true +} + +func codexTerminalErrorBody(eventData []byte, path string) []byte { + errorResult := gjson.GetBytes(eventData, path) + if !errorResult.Exists() { + return nil + } + body := []byte(`{"error":{}}`) + if errorResult.Type == gjson.JSON { + body, _ = sjson.SetRawBytes(body, "error", []byte(errorResult.Raw)) + } else if message := strings.TrimSpace(errorResult.String()); message != "" { + body, _ = sjson.SetBytes(body, "error.message", message) + } + if strings.TrimSpace(gjson.GetBytes(body, "error.message").String()) == "" { + if message := strings.TrimSpace(gjson.GetBytes(eventData, "response.error.message").String()); message != "" { + body, _ = sjson.SetBytes(body, "error.message", message) + } + } + if strings.TrimSpace(gjson.GetBytes(body, "error.message").String()) == "" { + if code := strings.TrimSpace(gjson.GetBytes(body, "error.code").String()); code != "" { + body, _ = sjson.SetBytes(body, "error.message", code) + } + } + if strings.TrimSpace(gjson.GetBytes(body, "error.message").String()) == "" { + if errorType := strings.TrimSpace(gjson.GetBytes(body, "error.type").String()); errorType != "" { + body, _ = sjson.SetBytes(body, "error.message", errorType) + } + } + return body +} + +func codexTerminalTopLevelErrorBody(eventData []byte) []byte { + message := strings.TrimSpace(gjson.GetBytes(eventData, "message").String()) + code := strings.TrimSpace(gjson.GetBytes(eventData, "code").String()) + errorType := strings.TrimSpace(gjson.GetBytes(eventData, "error_type").String()) + param := strings.TrimSpace(gjson.GetBytes(eventData, "param").String()) + if message == "" && code == "" && errorType == "" && param == "" { + return nil + } + + body := []byte(`{"error":{}}`) + if message != "" { + body, _ = sjson.SetBytes(body, "error.message", message) + } + if code != "" { + body, _ = sjson.SetBytes(body, "error.code", code) + } + if errorType != "" { + body, _ = sjson.SetBytes(body, "error.type", errorType) + } + if param != "" { + body, _ = sjson.SetBytes(body, "error.param", param) + } + if strings.TrimSpace(gjson.GetBytes(body, "error.message").String()) == "" { + if code != "" { + body, _ = sjson.SetBytes(body, "error.message", code) + } else if errorType != "" { + body, _ = sjson.SetBytes(body, "error.message", errorType) + } + } + return body +} + +func codexTerminalErrorIsContextLength(body []byte) bool { + errorCode := strings.ToLower(strings.TrimSpace(gjson.GetBytes(body, "error.code").String())) + message := strings.ToLower(strings.TrimSpace(gjson.GetBytes(body, "error.message").String())) + return errorCode == "context_length_exceeded" || + errorCode == "context_too_large" || + strings.Contains(message, "context window") || + strings.Contains(message, "context length") || + strings.Contains(message, "too many tokens") +} + // CodexExecutor is a stateless executor for Codex (OpenAI Responses API entrypoint). // If api_key is unavailable on auth, it falls back to legacy via ClientAdapter. type CodexExecutor struct { @@ -249,6 +346,11 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re eventData := bytes.TrimSpace(line[5:]) eventType := gjson.GetBytes(eventData, "type").String() + if streamErr, ok := codexTerminalStreamContextLengthErr(eventData); ok { + err = streamErr + return resp, err + } + if eventType == "response.output_item.done" { itemResult := gjson.GetBytes(eventData, "item") if !itemResult.Exists() || itemResult.Type != gjson.JSON { @@ -506,6 +608,15 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au if bytes.HasPrefix(line, dataTag) { data := bytes.TrimSpace(line[5:]) + if streamErr, ok := codexTerminalStreamContextLengthErr(data); ok { + helps.RecordAPIResponseError(ctx, e.cfg, streamErr) + reporter.PublishFailure(ctx, streamErr) + select { + case out <- cliproxyexecutor.StreamChunk{Err: streamErr}: + case <-ctx.Done(): + } + return + } switch gjson.GetBytes(data, "type").String() { case "response.output_item.done": collectCodexOutputItemDone(data, outputItemsByIndex, &outputItemsFallback) diff --git a/internal/runtime/executor/codex_executor_stream_output_test.go b/internal/runtime/executor/codex_executor_stream_output_test.go index b814c3e96..983f915bc 100644 --- a/internal/runtime/executor/codex_executor_stream_output_test.go +++ b/internal/runtime/executor/codex_executor_stream_output_test.go @@ -5,6 +5,7 @@ import ( "context" "net/http" "net/http/httptest" + "strings" "testing" "github.com/router-for-me/CLIProxyAPI/v7/internal/config" @@ -46,6 +47,128 @@ func TestCodexExecutorExecute_EmptyStreamCompletionOutputUsesOutputItemDone(t *t } } +func TestCodexExecutorExecuteSurfacesTerminalStreamError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte("event: response.created\n")) + _, _ = w.Write([]byte(`data: {"type":"response.created","response":{"id":"resp_1","model":"gpt-5.5"}}` + "\n\n")) + _, _ = w.Write([]byte("event: error\n")) + _, _ = w.Write([]byte(`data: {"type":"error","error":{"type":"invalid_request_error","code":"context_length_exceeded","message":"Your input exceeds the context window of this model. Please adjust your input and try again.","param":"input"},"sequence_number":2}` + "\n\n")) + _, _ = w.Write([]byte("event: response.failed\n")) + _, _ = w.Write([]byte(`data: {"type":"response.failed","response":{"id":"resp_1","status":"failed","error":{"code":"context_length_exceeded","message":"Your input exceeds the context window of this model. Please adjust your input and try again."}}}` + "\n\n")) + })) + defer server.Close() + + executor := NewCodexExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{Attributes: map[string]string{ + "base_url": server.URL, + "api_key": "test", + }} + + _, err := executor.Execute(context.Background(), auth, cliproxyexecutor.Request{ + Model: "gpt-5.5", + Payload: []byte(`{"model":"gpt-5.5","input":"hello"}`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("openai-response"), + Stream: false, + }) + if err == nil { + t.Fatal("expected terminal stream error, got nil") + } + if got := statusCodeFromTestError(t, err); got != http.StatusBadRequest { + t.Fatalf("status code = %d, want %d; err=%v", got, http.StatusBadRequest, err) + } + assertCodexErrorCode(t, err.Error(), "invalid_request_error", "context_too_large") + if !strings.Contains(err.Error(), "Your input exceeds the context window") { + t.Fatalf("error message missing upstream context text: %v", err) + } +} + +func TestCodexExecutorExecuteStreamSurfacesTerminalStreamError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte("event: response.created\n")) + _, _ = w.Write([]byte(`data: {"type":"response.created","response":{"id":"resp_1","model":"gpt-5.5"}}` + "\n\n")) + _, _ = w.Write([]byte("event: error\n")) + _, _ = w.Write([]byte(`data: {"type":"error","error":{"type":"invalid_request_error","code":"context_length_exceeded","message":"Your input exceeds the context window of this model. Please adjust your input and try again.","param":"input"},"sequence_number":2}` + "\n\n")) + })) + defer server.Close() + + executor := NewCodexExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{Attributes: map[string]string{ + "base_url": server.URL, + "api_key": "test", + }} + + result, err := executor.ExecuteStream(context.Background(), auth, cliproxyexecutor.Request{ + Model: "gpt-5.5", + Payload: []byte(`{"model":"gpt-5.5","input":"hello"}`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("openai-response"), + Stream: true, + }) + if err != nil { + t.Fatalf("ExecuteStream error: %v", err) + } + + var streamErr error + for chunk := range result.Chunks { + if chunk.Err != nil { + streamErr = chunk.Err + break + } + } + if streamErr == nil { + t.Fatal("missing stream terminal error") + } + if got := statusCodeFromTestError(t, streamErr); got != http.StatusBadRequest { + t.Fatalf("status code = %d, want %d; err=%v", got, http.StatusBadRequest, streamErr) + } + assertCodexErrorCode(t, streamErr.Error(), "invalid_request_error", "context_too_large") +} + +func TestCodexTerminalStreamContextLengthErrFromResponseFailed(t *testing.T) { + err, ok := codexTerminalStreamContextLengthErr([]byte(`{"type":"response.failed","response":{"id":"resp_1","status":"failed","error":{"code":"context_length_exceeded","message":"Your input exceeds the context window of this model. Please adjust your input and try again."}}}`)) + if !ok { + t.Fatal("expected context length terminal error") + } + if got := statusCodeFromTestError(t, err); got != http.StatusBadRequest { + t.Fatalf("status code = %d, want %d; err=%v", got, http.StatusBadRequest, err) + } + assertCodexErrorCode(t, err.Error(), "invalid_request_error", "context_too_large") +} + +func TestCodexTerminalStreamContextLengthErrFromTopLevelError(t *testing.T) { + err, ok := codexTerminalStreamContextLengthErr([]byte(`{"type":"error","code":"context_length_exceeded","message":"Your input exceeds the context window of this model. Please adjust your input and try again.","sequence_number":2}`)) + if !ok { + t.Fatal("expected top-level context length terminal error") + } + if got := statusCodeFromTestError(t, err); got != http.StatusBadRequest { + t.Fatalf("status code = %d, want %d; err=%v", got, http.StatusBadRequest, err) + } + assertCodexErrorCode(t, err.Error(), "invalid_request_error", "context_too_large") + if !strings.Contains(err.Error(), "Your input exceeds the context window") { + t.Fatalf("error message missing upstream context text: %v", err) + } +} + +func TestCodexTerminalStreamContextLengthErrIgnoresOtherTerminalErrors(t *testing.T) { + _, ok := codexTerminalStreamContextLengthErr([]byte(`{"type":"error","error":{"type":"rate_limit_error","code":"rate_limit_exceeded","message":"Rate limit reached."}}`)) + if ok { + t.Fatal("rate limit terminal error should not be handled by context length fix") + } +} + +func statusCodeFromTestError(t *testing.T, err error) int { + t.Helper() + + statusErr, ok := err.(interface{ StatusCode() int }) + if !ok { + t.Fatalf("error %T does not expose StatusCode(): %v", err, err) + } + return statusErr.StatusCode() +} + func TestCodexExecutorExecuteStream_EmptyStreamCompletionOutputUsesOutputItemDone(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") diff --git a/sdk/api/handlers/claude/code_handlers.go b/sdk/api/handlers/claude/code_handlers.go index 464f385eb..4724a7277 100644 --- a/sdk/api/handlers/claude/code_handlers.go +++ b/sdk/api/handlers/claude/code_handlers.go @@ -14,6 +14,8 @@ import ( "fmt" "io" "net/http" + "strings" + "time" "github.com/gin-gonic/gin" . "github.com/router-for-me/CLIProxyAPI/v7/internal/constant" @@ -257,6 +259,15 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [ return case chunk, ok := <-dataChan: if !ok { + if errMsg, okPendingErr := pendingClaudeStreamError(errChan); okPendingErr { + h.WriteErrorResponse(c, errMsg) + if errMsg != nil { + cliCancel(errMsg.Error) + } else { + cliCancel(nil) + } + return + } // Stream closed without data? Send DONE or just headers. setSSEHeaders() handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders) @@ -282,6 +293,21 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [ } } +func pendingClaudeStreamError(errs <-chan *interfaces.ErrorMessage) (*interfaces.ErrorMessage, bool) { + if errs == nil { + return nil, false + } + select { + case errMsg, ok := <-errs: + if !ok { + return nil, false + } + return errMsg, true + default: + return nil, false + } +} + func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.Flusher, cancel func(error), data <-chan []byte, errs <-chan *interfaces.ErrorMessage) { h.ForwardStream(c, flusher, cancel, data, errs, handlers.StreamForwardOptions{ WriteChunk: func(chunk []byte) { @@ -317,11 +343,135 @@ type claudeErrorResponse struct { } func (h *ClaudeCodeAPIHandler) toClaudeError(msg *interfaces.ErrorMessage) claudeErrorResponse { + status := http.StatusInternalServerError + errText := http.StatusText(status) + if msg != nil { + if msg.StatusCode > 0 { + status = msg.StatusCode + errText = http.StatusText(status) + } + if msg.Error != nil { + if v := strings.TrimSpace(msg.Error.Error()); v != "" { + errText = v + } + } + } + errType, message := claudeErrorDetailFromText(status, errText) return claudeErrorResponse{ Type: "error", Error: claudeErrorDetail{ - Type: "api_error", - Message: msg.Error.Error(), + Type: errType, + Message: message, }, } } + +func (h *ClaudeCodeAPIHandler) WriteErrorResponse(c *gin.Context, msg *interfaces.ErrorMessage) { + status := http.StatusInternalServerError + if msg != nil && msg.StatusCode > 0 { + status = msg.StatusCode + } + if msg != nil && msg.Addon != nil && handlers.PassthroughHeadersEnabled(h.Cfg) { + for key, values := range msg.Addon { + if len(values) == 0 { + continue + } + c.Writer.Header().Del(key) + for _, value := range values { + c.Writer.Header().Add(key, value) + } + } + } + + body, err := json.Marshal(h.toClaudeError(msg)) + if err != nil { + body = []byte(`{"type":"error","error":{"type":"api_error","message":"Internal Server Error"}}`) + } + appendClaudeAPIResponse(c, body) + if !c.Writer.Written() { + c.Writer.Header().Set("Content-Type", "application/json") + } + c.Status(status) + _, _ = c.Writer.Write(body) +} + +func claudeErrorDetailFromText(status int, errText string) (string, string) { + message := strings.TrimSpace(errText) + if message == "" { + message = http.StatusText(status) + } + errType := claudeErrorTypeFromStatus(status) + + var payload map[string]any + if json.Valid([]byte(message)) { + if err := json.Unmarshal([]byte(message), &payload); err == nil { + if e, ok := payload["error"].(map[string]any); ok { + if t, ok := e["type"].(string); ok && strings.TrimSpace(t) != "" { + errType = strings.TrimSpace(t) + } + if m, ok := e["message"].(string); ok && strings.TrimSpace(m) != "" { + message = strings.TrimSpace(m) + } else if c, ok := e["code"].(string); ok && strings.TrimSpace(c) != "" { + message = strings.TrimSpace(c) + } + } else { + if t, ok := payload["type"].(string); ok && strings.TrimSpace(t) != "" && strings.TrimSpace(t) != "error" { + errType = strings.TrimSpace(t) + } + if m, ok := payload["message"].(string); ok && strings.TrimSpace(m) != "" { + message = strings.TrimSpace(m) + } + } + } + } + + return errType, message +} + +func claudeErrorTypeFromStatus(status int) string { + switch status { + case http.StatusUnauthorized: + return "authentication_error" + case http.StatusPaymentRequired: + return "billing_error" + case http.StatusForbidden: + return "permission_error" + case http.StatusNotFound: + return "not_found_error" + case http.StatusRequestEntityTooLarge: + return "request_too_large" + case http.StatusTooManyRequests: + return "rate_limit_error" + case http.StatusGatewayTimeout: + return "timeout_error" + case 529: + return "overloaded_error" + default: + if status >= http.StatusInternalServerError { + return "api_error" + } + return "invalid_request_error" + } +} + +func appendClaudeAPIResponse(c *gin.Context, data []byte) { + if c == nil || len(data) == 0 { + return + } + if _, exists := c.Get("API_RESPONSE_TIMESTAMP"); !exists { + c.Set("API_RESPONSE_TIMESTAMP", time.Now()) + } + if existing, exists := c.Get("API_RESPONSE"); exists { + if existingBytes, ok := existing.([]byte); ok && len(existingBytes) > 0 { + combined := make([]byte, 0, len(existingBytes)+len(data)+1) + combined = append(combined, existingBytes...) + if existingBytes[len(existingBytes)-1] != '\n' { + combined = append(combined, '\n') + } + combined = append(combined, data...) + c.Set("API_RESPONSE", combined) + return + } + } + c.Set("API_RESPONSE", bytes.Clone(data)) +} diff --git a/sdk/api/handlers/claude/code_handlers_error_test.go b/sdk/api/handlers/claude/code_handlers_error_test.go new file mode 100644 index 000000000..5ba9dd061 --- /dev/null +++ b/sdk/api/handlers/claude/code_handlers_error_test.go @@ -0,0 +1,94 @@ +package claude + +import ( + "errors" + "net/http" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" + "github.com/router-for-me/CLIProxyAPI/v7/internal/interfaces" + "github.com/tidwall/gjson" +) + +func TestClaudeErrorExtractsOpenAIStyleUpstreamJSON(t *testing.T) { + handler := &ClaudeCodeAPIHandler{} + msg := &interfaces.ErrorMessage{ + StatusCode: http.StatusBadRequest, + Error: errors.New(`{"error":{"message":"Your input exceeds the context window of this model. Please adjust your input and try again.","type":"invalid_request_error","code":"context_too_large"}}`), + } + + got := handler.toClaudeError(msg) + + if got.Type != "error" { + t.Fatalf("type = %q, want error", got.Type) + } + if got.Error.Type != "invalid_request_error" { + t.Fatalf("error.type = %q, want invalid_request_error", got.Error.Type) + } + if got.Error.Message != "Your input exceeds the context window of this model. Please adjust your input and try again." { + t.Fatalf("error.message = %q", got.Error.Message) + } +} + +func TestClaudeErrorExtractsClaudeStyleUpstreamJSON(t *testing.T) { + handler := &ClaudeCodeAPIHandler{} + msg := &interfaces.ErrorMessage{ + StatusCode: http.StatusTooManyRequests, + Error: errors.New(`{"type":"error","error":{"type":"rate_limit_error","message":"This request would exceed your account's rate limit. Please try again later."},"request_id":"req_123"}`), + } + + got := handler.toClaudeError(msg) + + if got.Error.Type != "rate_limit_error" { + t.Fatalf("error.type = %q, want rate_limit_error", got.Error.Type) + } + if got.Error.Message != "This request would exceed your account's rate limit. Please try again later." { + t.Fatalf("error.message = %q", got.Error.Message) + } +} + +func TestWriteClaudeErrorResponseUsesClaudeEnvelope(t *testing.T) { + gin.SetMode(gin.TestMode) + recorder := httptest.NewRecorder() + c, _ := gin.CreateTestContext(recorder) + handler := &ClaudeCodeAPIHandler{} + msg := &interfaces.ErrorMessage{ + StatusCode: http.StatusBadRequest, + Error: errors.New(`{"error":{"message":"Your input exceeds the context window of this model. Please adjust your input and try again.","type":"invalid_request_error","code":"context_too_large"}}`), + } + + handler.WriteErrorResponse(c, msg) + + if recorder.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want %d", recorder.Code, http.StatusBadRequest) + } + body := recorder.Body.Bytes() + if got := gjson.GetBytes(body, "type").String(); got != "error" { + t.Fatalf("type = %q, want error; body=%s", got, body) + } + if got := gjson.GetBytes(body, "error.type").String(); got != "invalid_request_error" { + t.Fatalf("error.type = %q, want invalid_request_error; body=%s", got, body) + } + if got := gjson.GetBytes(body, "error.message").String(); got != "Your input exceeds the context window of this model. Please adjust your input and try again." { + t.Fatalf("error.message = %q; body=%s", got, body) + } +} + +func TestPendingClaudeStreamErrorUsesBufferedError(t *testing.T) { + wantErr := &interfaces.ErrorMessage{ + StatusCode: http.StatusBadRequest, + Error: errors.New(`{"error":{"message":"Your input exceeds the context window of this model. Please adjust your input and try again.","type":"invalid_request_error","code":"context_too_large"}}`), + } + errs := make(chan *interfaces.ErrorMessage, 1) + errs <- wantErr + close(errs) + + gotErr, ok := pendingClaudeStreamError(errs) + if !ok { + t.Fatal("expected pending stream error") + } + if gotErr != wantErr { + t.Fatalf("pending error = %p, want %p", gotErr, wantErr) + } +}