diff --git a/internal/auth/cursor/proto/connect.go b/internal/auth/cursor/proto/connect.go index db9e4288..ffe5905e 100644 --- a/internal/auth/cursor/proto/connect.go +++ b/internal/auth/cursor/proto/connect.go @@ -41,8 +41,21 @@ func ParseConnectFrame(buf []byte) (flags byte, payload []byte, consumed int, ok return flags, buf[5:total], total, true } +// ConnectError is a structured error from the Connect protocol end-of-stream trailer. +// The Code field contains the server-defined error code (e.g. gRPC standard codes +// like "resource_exhausted", "unauthenticated", "permission_denied", "unavailable"). +type ConnectError struct { + Code string // server-defined error code + Message string // human-readable error description +} + +func (e *ConnectError) Error() string { + return fmt.Sprintf("Connect error %s: %s", e.Code, e.Message) +} + // ParseConnectEndStream parses a Connect end-of-stream frame payload (JSON). // Returns nil if there is no error in the trailer. +// On error, returns a *ConnectError with the server's error code and message. func ParseConnectEndStream(data []byte) error { if len(data) == 0 { return nil @@ -65,7 +78,7 @@ func ParseConnectEndStream(data []byte) error { if msg == "" { msg = "Unknown error" } - return fmt.Errorf("Connect error %s: %s", code, msg) + return &ConnectError{Code: code, Message: msg} } return nil } diff --git a/internal/runtime/executor/cursor_executor.go b/internal/runtime/executor/cursor_executor.go index 2f34ee05..73335f50 100644 --- a/internal/runtime/executor/cursor_executor.go +++ b/internal/runtime/executor/cursor_executor.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/sha256" + "errors" "crypto/tls" "encoding/base64" "encoding/hex" @@ -140,6 +141,60 @@ func (e *CursorExecutor) findSessionByConversationLocked(convId string) string { return "" } +// cursorStatusErr implements the StatusError and RetryAfter interfaces so the +// conductor can classify Cursor errors (e.g. 429 → quota cooldown). +type cursorStatusErr struct { + code int + msg string +} + +func (e cursorStatusErr) Error() string { return e.msg } +func (e cursorStatusErr) StatusCode() int { return e.code } +func (e cursorStatusErr) RetryAfter() *time.Duration { return nil } // no retry-after info from Cursor; conductor uses exponential backoff + +// classifyCursorError maps Cursor Connect/H2 errors to HTTP status codes. +// Layer 1: precise match on ConnectError.Code (gRPC standard codes). +// Layer 2: fuzzy string match for H2 frame errors and unknown formats. +// Unclassified errors pass through unchanged. +func classifyCursorError(err error) error { + if err == nil { + return nil + } + + // Layer 1: structured ConnectError from ParseConnectEndStream + var ce *cursorproto.ConnectError + if errors.As(err, &ce) { + log.Infof("cursor: Connect error code=%q message=%q", ce.Code, ce.Message) + switch ce.Code { + case "resource_exhausted": + return cursorStatusErr{code: 429, msg: err.Error()} + case "unauthenticated": + return cursorStatusErr{code: 401, msg: err.Error()} + case "permission_denied": + return cursorStatusErr{code: 403, msg: err.Error()} + case "unavailable": + return cursorStatusErr{code: 503, msg: err.Error()} + case "internal": + return cursorStatusErr{code: 500, msg: err.Error()} + default: + // Unknown Connect code — log for observation, treat as 502 + return cursorStatusErr{code: 502, msg: err.Error()} + } + } + + // Layer 2: fuzzy match for H2 errors and unstructured messages + msg := strings.ToLower(err.Error()) + switch { + case strings.Contains(msg, "rate limit") || strings.Contains(msg, "quota") || + strings.Contains(msg, "too many"): + return cursorStatusErr{code: 429, msg: err.Error()} + case strings.Contains(msg, "rst_stream") || strings.Contains(msg, "goaway"): + return cursorStatusErr{code: 502, msg: err.Error()} + } + + return err +} + // PrepareRequest implements ProviderExecutor (for HttpRequest support). func (e *CursorExecutor) PrepareRequest(req *http.Request, auth *cliproxyauth.Auth) error { token := cursorAccessToken(auth) @@ -273,7 +328,7 @@ func (e *CursorExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r nil, // tokenUsage - non-streaming nil, // onCheckpoint - non-streaming doesn't persist ); streamErr != nil && fullText.Len() == 0 { - return resp, fmt.Errorf("cursor: stream error: %w", streamErr) + return resp, classifyCursorError(fmt.Errorf("cursor: stream error: %w", streamErr)) } id := "chatcmpl-" + uuid.New().String()[:28] @@ -680,7 +735,7 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A // return an error so the conductor retries with a different auth. select { case streamErr := <-streamErrCh: - return nil, fmt.Errorf("cursor: stream failed before response: %w", streamErr) + return nil, classifyCursorError(fmt.Errorf("cursor: stream failed before response: %w", streamErr)) case <-firstChunkSent: // Data started flowing — return stream to client return &cliproxyexecutor.StreamResult{Chunks: chunks}, nil