From 6431cec7d3c12d14a5eac272a8555d82753b4dad Mon Sep 17 00:00:00 2001 From: Code_G <12405078+codeg-dev@users.noreply.github.com> Date: Mon, 6 Apr 2026 17:16:15 +0900 Subject: [PATCH 1/2] fix(claude-auth): dedupe OAuth refresh and honor 429 backoff Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus --- internal/auth/claude/anthropic_auth.go | 135 +++++++++++++++++++- internal/auth/claude/anthropic_auth_test.go | 123 ++++++++++++++++++ 2 files changed, 257 insertions(+), 1 deletion(-) create mode 100644 internal/auth/claude/anthropic_auth_test.go diff --git a/internal/auth/claude/anthropic_auth.go b/internal/auth/claude/anthropic_auth.go index 12bb53ac3..b7f997efe 100644 --- a/internal/auth/claude/anthropic_auth.go +++ b/internal/auth/claude/anthropic_auth.go @@ -6,15 +6,18 @@ package claude import ( "context" "encoding/json" + "errors" "fmt" "io" "net/http" "net/url" "strings" + "sync" "time" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" log "github.com/sirupsen/logrus" + "golang.org/x/sync/singleflight" ) // OAuth configuration constants for Claude/Anthropic @@ -23,8 +26,94 @@ const ( TokenURL = "https://api.anthropic.com/v1/oauth/token" ClientID = "9d1c250a-e61b-44d9-88ed-5944d1962f5e" RedirectURI = "http://localhost:54545/callback" + + claudeRefreshMinBackoff = 5 * time.Second + claudeRefreshMaxBackoff = 5 * time.Minute ) +var ( + claudeRefreshGroup singleflight.Group + claudeRefreshMu sync.Mutex + claudeRefreshBlock = make(map[string]time.Time) +) + +type refreshHTTPError struct { + status int + message string + retryable bool +} + +func (e *refreshHTTPError) Error() string { + return fmt.Sprintf("token refresh failed with status %d: %s", e.status, e.message) +} + +func (e *refreshHTTPError) Retryable() bool { + return e != nil && e.retryable +} + +func resetClaudeRefreshState() { + claudeRefreshMu.Lock() + defer claudeRefreshMu.Unlock() + claudeRefreshBlock = make(map[string]time.Time) + claudeRefreshGroup = singleflight.Group{} +} + +func claudeRefreshBlockedUntil(refreshToken string) time.Time { + claudeRefreshMu.Lock() + defer claudeRefreshMu.Unlock() + return claudeRefreshBlock[refreshToken] +} + +func setClaudeRefreshBlockedUntil(refreshToken string, until time.Time) { + claudeRefreshMu.Lock() + defer claudeRefreshMu.Unlock() + claudeRefreshBlock[refreshToken] = until +} + +func clearClaudeRefreshBlockedUntil(refreshToken string) { + claudeRefreshMu.Lock() + defer claudeRefreshMu.Unlock() + delete(claudeRefreshBlock, refreshToken) +} + +func clampClaudeRefreshBackoff(d time.Duration) time.Duration { + if d < claudeRefreshMinBackoff { + return claudeRefreshMinBackoff + } + if d > claudeRefreshMaxBackoff { + return claudeRefreshMaxBackoff + } + return d +} + +func parseClaudeRetryAfter(resp *http.Response) time.Duration { + if resp == nil { + return claudeRefreshMinBackoff + } + if raw := strings.TrimSpace(resp.Header.Get("Retry-After")); raw != "" { + if seconds, err := time.ParseDuration(raw + "s"); err == nil { + return clampClaudeRefreshBackoff(seconds) + } + if when, err := http.ParseTime(raw); err == nil { + return clampClaudeRefreshBackoff(time.Until(when)) + } + } + if raw := strings.TrimSpace(resp.Header.Get("Retry-After-Ms")); raw != "" { + if ms, err := time.ParseDuration(raw + "ms"); err == nil { + return clampClaudeRefreshBackoff(ms) + } + } + return claudeRefreshMinBackoff +} + +func isClaudeRefreshRetryable(err error) bool { + var httpErr *refreshHTTPError + if errors.As(err, &httpErr) { + return httpErr.Retryable() + } + return true +} + // tokenResponse represents the response structure from Anthropic's OAuth token endpoint. // It contains access token, refresh token, and associated user/organization information. type tokenResponse struct { @@ -222,6 +311,35 @@ func (o *ClaudeAuth) RefreshTokens(ctx context.Context, refreshToken string) (*C if refreshToken == "" { return nil, fmt.Errorf("refresh token is required") } + if blockedUntil := claudeRefreshBlockedUntil(refreshToken); blockedUntil.After(time.Now()) { + return nil, &refreshHTTPError{ + status: http.StatusTooManyRequests, + message: fmt.Sprintf("refresh temporarily blocked until %s", blockedUntil.Format(time.RFC3339)), + retryable: false, + } + } + + result, err, _ := claudeRefreshGroup.Do(refreshToken, func() (interface{}, error) { + return o.refreshTokensSingleFlight(context.WithoutCancel(ctx), refreshToken) + }) + if err != nil { + return nil, err + } + tokenData, ok := result.(*ClaudeTokenData) + if !ok || tokenData == nil { + return nil, fmt.Errorf("token refresh failed: invalid single-flight result") + } + return tokenData, nil +} + +func (o *ClaudeAuth) refreshTokensSingleFlight(ctx context.Context, refreshToken string) (*ClaudeTokenData, error) { + if blockedUntil := claudeRefreshBlockedUntil(refreshToken); blockedUntil.After(time.Now()) { + return nil, &refreshHTTPError{ + status: http.StatusTooManyRequests, + message: fmt.Sprintf("refresh temporarily blocked until %s", blockedUntil.Format(time.RFC3339)), + retryable: false, + } + } reqBody := map[string]interface{}{ "client_id": ClientID, @@ -256,7 +374,17 @@ func (o *ClaudeAuth) RefreshTokens(ctx context.Context, refreshToken string) (*C } if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("token refresh failed with status %d: %s", resp.StatusCode, string(body)) + message := string(body) + if resp.StatusCode == http.StatusTooManyRequests { + retryAfter := parseClaudeRetryAfter(resp) + setClaudeRefreshBlockedUntil(refreshToken, time.Now().Add(retryAfter)) + return nil, &refreshHTTPError{status: resp.StatusCode, message: message, retryable: false} + } + return nil, &refreshHTTPError{ + status: resp.StatusCode, + message: message, + retryable: resp.StatusCode >= http.StatusInternalServerError, + } } // log.Debugf("Token response: %s", string(body)) @@ -267,6 +395,8 @@ func (o *ClaudeAuth) RefreshTokens(ctx context.Context, refreshToken string) (*C } // Create token data + clearClaudeRefreshBlockedUntil(refreshToken) + return &ClaudeTokenData{ AccessToken: tokenResp.AccessToken, RefreshToken: tokenResp.RefreshToken, @@ -328,6 +458,9 @@ func (o *ClaudeAuth) RefreshTokensWithRetry(ctx context.Context, refreshToken st lastErr = err log.Warnf("Token refresh attempt %d failed: %v", attempt+1, err) + if !isClaudeRefreshRetryable(err) { + break + } } return nil, fmt.Errorf("token refresh failed after %d attempts: %w", maxRetries, lastErr) diff --git a/internal/auth/claude/anthropic_auth_test.go b/internal/auth/claude/anthropic_auth_test.go new file mode 100644 index 000000000..0b14d0834 --- /dev/null +++ b/internal/auth/claude/anthropic_auth_test.go @@ -0,0 +1,123 @@ +package claude + +import ( + "context" + "io" + "net/http" + "strings" + "sync" + "sync/atomic" + "testing" + "time" +) + +type roundTripFunc func(*http.Request) (*http.Response, error) + +func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req) +} + +func TestRefreshTokensWithRetry_429BlocksImmediateReplay(t *testing.T) { + resetClaudeRefreshState() + defer resetClaudeRefreshState() + + var calls int32 + auth := &ClaudeAuth{ + httpClient: &http.Client{ + Transport: roundTripFunc(func(req *http.Request) (*http.Response, error) { + atomic.AddInt32(&calls, 1) + return &http.Response{ + StatusCode: http.StatusTooManyRequests, + Body: io.NopCloser(strings.NewReader(`{"error":"rate_limited"}`)), + Header: http.Header{"Retry-After": []string{"60"}}, + Request: req, + }, nil + }), + }, + } + + _, err := auth.RefreshTokensWithRetry(context.Background(), "dummy_refresh_token", 3) + if err == nil { + t.Fatalf("expected 429 refresh error") + } + if !strings.Contains(err.Error(), "status 429") { + t.Fatalf("expected status 429 in error, got %v", err) + } + if got := atomic.LoadInt32(&calls); got != 1 { + t.Fatalf("expected 1 refresh attempt after 429, got %d", got) + } + + _, err = auth.RefreshTokensWithRetry(context.Background(), "dummy_refresh_token", 3) + if err == nil { + t.Fatalf("expected immediate blocked refresh error") + } + if got := atomic.LoadInt32(&calls); got != 1 { + t.Fatalf("expected blocked retry to avoid a second refresh call, got %d attempts", got) + } + if blockedUntil := claudeRefreshBlockedUntil("dummy_refresh_token"); !blockedUntil.After(time.Now()) { + t.Fatalf("expected blocked-until timestamp to be set, got %v", blockedUntil) + } +} + +func TestRefreshTokens_DeduplicatesConcurrentRefresh(t *testing.T) { + resetClaudeRefreshState() + defer resetClaudeRefreshState() + + var calls int32 + started := make(chan struct{}) + release := make(chan struct{}) + var once sync.Once + + auth := &ClaudeAuth{ + httpClient: &http.Client{ + Transport: roundTripFunc(func(req *http.Request) (*http.Response, error) { + atomic.AddInt32(&calls, 1) + once.Do(func() { close(started) }) + <-release + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(`{ + "access_token":"new-access", + "refresh_token":"new-refresh", + "token_type":"Bearer", + "expires_in":3600, + "account":{"email_address":"shared@example.com"} + }`)), + Header: make(http.Header), + Request: req, + }, nil + }), + }, + } + + results := make(chan *ClaudeTokenData, 2) + errs := make(chan error, 2) + runRefresh := func() { + td, err := auth.RefreshTokens(context.Background(), "shared-refresh-token") + results <- td + errs <- err + } + + go runRefresh() + go runRefresh() + + <-started + time.Sleep(20 * time.Millisecond) + if got := atomic.LoadInt32(&calls); got != 1 { + t.Fatalf("expected concurrent refresh to share a single upstream call, got %d", got) + } + close(release) + + for i := 0; i < 2; i++ { + if err := <-errs; err != nil { + t.Fatalf("expected refresh to succeed, got %v", err) + } + td := <-results + if td == nil || td.AccessToken != "new-access" { + t.Fatalf("expected refreshed access token, got %#v", td) + } + } + if got := atomic.LoadInt32(&calls); got != 1 { + t.Fatalf("expected exactly 1 upstream refresh call, got %d", got) + } +} From 29e32aaab940f0681b9f9a9b2da94b81d2e5d098 Mon Sep 17 00:00:00 2001 From: Code_G <12405078+codeg-dev@users.noreply.github.com> Date: Mon, 6 Apr 2026 17:16:42 +0900 Subject: [PATCH 2/2] fix(executor): route Claude refresh through retry-aware auth Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus --- internal/runtime/executor/claude_executor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/runtime/executor/claude_executor.go b/internal/runtime/executor/claude_executor.go index 7b2e5d8d5..93487311f 100644 --- a/internal/runtime/executor/claude_executor.go +++ b/internal/runtime/executor/claude_executor.go @@ -594,7 +594,7 @@ func (e *ClaudeExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) ( return auth, nil } svc := claudeauth.NewClaudeAuth(e.cfg) - td, err := svc.RefreshTokens(ctx, refreshToken) + td, err := svc.RefreshTokensWithRetry(ctx, refreshToken, 3) if err != nil { return nil, err }