From 66c3dae06b2ae5db101683ce3ef76b30361d55c0 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Sun, 10 May 2026 01:30:43 +0800 Subject: [PATCH] feat(home): implement `count` for home auth dispatch requests and enable usage statistics - Added `count` attribute to `homeAuthCount` requests to improve home message batching. - Enabled usage statistics for home mode by default and added config-level enforcement. - Adjusted failure logging to include detailed metadata in `UsageReporter`. - Updated multiple executors to pass error details to `PublishFailure` for better debugging. - Enhanced unit tests to validate `count` behavior and usage statistics enforcement across components. --- cmd/server/main.go | 1 + internal/home/client.go | 22 +++-- internal/home/client_test.go | 32 +++++++ internal/home/requests.go | 1 + internal/redisqueue/plugin.go | 25 ++++++ internal/redisqueue/plugin_test.go | 44 +++++++++- .../runtime/executor/aistudio_executor.go | 4 +- .../runtime/executor/antigravity_executor.go | 4 +- internal/runtime/executor/claude_executor.go | 4 +- internal/runtime/executor/codex_executor.go | 2 +- .../executor/codex_websockets_executor.go | 6 +- .../runtime/executor/gemini_cli_executor.go | 4 +- internal/runtime/executor/gemini_executor.go | 2 +- .../executor/gemini_vertex_executor.go | 4 +- .../runtime/executor/helps/usage_helpers.go | 51 ++++++++---- internal/runtime/executor/kimi_executor.go | 2 +- .../executor/openai_compat_executor.go | 4 +- sdk/cliproxy/auth/conductor.go | 83 ++++++++++++++++--- sdk/cliproxy/service.go | 2 + sdk/cliproxy/service_stale_state_test.go | 29 +++++++ sdk/cliproxy/usage/manager.go | 7 ++ 21 files changed, 281 insertions(+), 52 deletions(-) create mode 100644 internal/home/client_test.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 481103809..1ef830066 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -290,6 +290,7 @@ func main() { } parsed.Home = homeCfg parsed.Port = 8317 // Default to 8317 for home mode, can be overridden by home config + parsed.UsageStatisticsEnabled = true cfg = parsed // Keep a non-empty config path for downstream components (log paths, management assets, etc), diff --git a/internal/home/client.go b/internal/home/client.go index e99ef7532..23082cc69 100644 --- a/internal/home/client.go +++ b/internal/home/client.go @@ -190,7 +190,20 @@ func headersToLowerMap(headers http.Header) map[string]string { return out } -func (c *Client) RPopAuth(ctx context.Context, requestedModel string, sessionID string, headers http.Header) ([]byte, error) { +func newAuthDispatchRequest(requestedModel string, sessionID string, headers http.Header, count int) authDispatchRequest { + if count <= 0 { + count = 1 + } + return authDispatchRequest{ + Type: "auth", + Model: requestedModel, + Count: count, + SessionID: strings.TrimSpace(sessionID), + Headers: headersToLowerMap(headers), + } +} + +func (c *Client) RPopAuth(ctx context.Context, requestedModel string, sessionID string, headers http.Header, count int) ([]byte, error) { if err := c.ensureClients(); err != nil { return nil, err } @@ -198,12 +211,7 @@ func (c *Client) RPopAuth(ctx context.Context, requestedModel string, sessionID if requestedModel == "" { return nil, fmt.Errorf("home: requested model is empty") } - req := authDispatchRequest{ - Type: "auth", - Model: requestedModel, - SessionID: strings.TrimSpace(sessionID), - Headers: headersToLowerMap(headers), - } + req := newAuthDispatchRequest(requestedModel, sessionID, headers, count) keyBytes, err := json.Marshal(&req) if err != nil { return nil, err diff --git a/internal/home/client_test.go b/internal/home/client_test.go new file mode 100644 index 000000000..625e77bca --- /dev/null +++ b/internal/home/client_test.go @@ -0,0 +1,32 @@ +package home + +import ( + "encoding/json" + "net/http" + "testing" +) + +func TestAuthDispatchRequestIncludesCount(t *testing.T) { + req := newAuthDispatchRequest("gpt-5.4", "session-1", http.Header{"Authorization": {"Bearer test"}}, 2) + + raw, err := json.Marshal(&req) + if err != nil { + t.Fatalf("marshal auth dispatch request: %v", err) + } + + var payload map[string]any + if err := json.Unmarshal(raw, &payload); err != nil { + t.Fatalf("unmarshal auth dispatch request: %v", err) + } + if got := int(payload["count"].(float64)); got != 2 { + t.Fatalf("count = %d, want 2", got) + } +} + +func TestAuthDispatchRequestDefaultsCountToOne(t *testing.T) { + req := newAuthDispatchRequest("gpt-5.4", "", nil, 0) + + if req.Count != 1 { + t.Fatalf("count = %d, want 1", req.Count) + } +} diff --git a/internal/home/requests.go b/internal/home/requests.go index d08f5a5d9..075776646 100644 --- a/internal/home/requests.go +++ b/internal/home/requests.go @@ -3,6 +3,7 @@ package home type authDispatchRequest struct { Type string `json:"type"` Model string `json:"model"` + Count int `json:"count"` SessionID string `json:"session_id,omitempty"` Headers map[string]string `json:"headers,omitempty"` } diff --git a/internal/redisqueue/plugin.go b/internal/redisqueue/plugin.go index 8a99de83b..e5b74cb24 100644 --- a/internal/redisqueue/plugin.go +++ b/internal/redisqueue/plugin.go @@ -66,6 +66,7 @@ func (p *usageQueuePlugin) HandleUsage(ctx context.Context, record coreusage.Rec if !failed { failed = !resolveSuccess(ctx) } + fail := resolveFail(ctx, record, failed) detail := requestDetail{ Timestamp: timestamp, @@ -74,6 +75,7 @@ func (p *usageQueuePlugin) HandleUsage(ctx context.Context, record coreusage.Rec AuthIndex: record.AuthIndex, Tokens: tokens, Failed: failed, + Fail: fail, } payload, err := json.Marshal(queuedUsageDetail{ @@ -110,6 +112,7 @@ type requestDetail struct { AuthIndex string `json:"auth_index"` Tokens tokenStats `json:"tokens"` Failed bool `json:"failed"` + Fail failDetail `json:"fail"` } type tokenStats struct { @@ -120,6 +123,28 @@ type tokenStats struct { TotalTokens int64 `json:"total_tokens"` } +type failDetail struct { + StatusCode int `json:"status_code"` + Body string `json:"body"` +} + +func resolveFail(ctx context.Context, record coreusage.Record, failed bool) failDetail { + fail := failDetail{ + StatusCode: record.Fail.StatusCode, + Body: strings.TrimSpace(record.Fail.Body), + } + if !failed { + return failDetail{StatusCode: 200} + } + if fail.StatusCode <= 0 { + fail.StatusCode = internallogging.GetResponseStatus(ctx) + } + if fail.StatusCode <= 0 { + fail.StatusCode = 500 + } + return fail +} + func resolveSuccess(ctx context.Context) bool { status := internallogging.GetResponseStatus(ctx) if status == 0 { diff --git a/internal/redisqueue/plugin_test.go b/internal/redisqueue/plugin_test.go index 4d7cb4652..e2af6af70 100644 --- a/internal/redisqueue/plugin_test.go +++ b/internal/redisqueue/plugin_test.go @@ -44,9 +44,10 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndSuccess(t *testing.T) { requireStringField(t, payload, "alias", "client-gpt") requireStringField(t, payload, "endpoint", "POST /v1/chat/completions") requireStringField(t, payload, "auth_type", "apikey") - requireStringField(t, payload, "user_api_key", "test-key") + requireMissingField(t, payload, "user_api_key") requireStringField(t, payload, "request_id", "ctx-request-id") requireBoolField(t, payload, "failed", false) + requireFailField(t, payload, http.StatusOK, "") }) } @@ -68,6 +69,10 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndFailureAndGinRequestID(t Source: "user@example.com", RequestedAt: time.Date(2026, 4, 25, 0, 0, 0, 0, time.UTC), Latency: 2500 * time.Millisecond, + Fail: coreusage.Failure{ + StatusCode: http.StatusInternalServerError, + Body: "upstream failed", + }, Detail: coreusage.Detail{ InputTokens: 10, OutputTokens: 20, @@ -81,9 +86,10 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndFailureAndGinRequestID(t requireStringField(t, payload, "alias", "client-mini") requireStringField(t, payload, "endpoint", "GET /v1/responses") requireStringField(t, payload, "auth_type", "apikey") - requireStringField(t, payload, "user_api_key", "test-key") + requireMissingField(t, payload, "user_api_key") requireStringField(t, payload, "request_id", "gin-request-id") requireBoolField(t, payload, "failed", true) + requireFailField(t, payload, http.StatusInternalServerError, "upstream failed") }) } @@ -115,6 +121,10 @@ func TestUsageQueuePluginAsyncIgnoresRecycledGinContext(t *testing.T) { Source: "user@example.com", RequestedAt: time.Date(2026, 4, 25, 0, 0, 0, 0, time.UTC), Latency: 1500 * time.Millisecond, + Fail: coreusage.Failure{ + StatusCode: http.StatusBadGateway, + Body: "bad gateway", + }, Detail: coreusage.Detail{ InputTokens: 10, OutputTokens: 20, @@ -125,9 +135,10 @@ func TestUsageQueuePluginAsyncIgnoresRecycledGinContext(t *testing.T) { payload := waitForSinglePayload(t, 2*time.Second) requireStringField(t, payload, "endpoint", "POST /v1/chat/completions") requireStringField(t, payload, "alias", "client-gpt") - requireStringField(t, payload, "user_api_key", "test-key") + requireMissingField(t, payload, "user_api_key") requireStringField(t, payload, "request_id", "ctx-request-id") requireBoolField(t, payload, "failed", true) + requireFailField(t, payload, http.StatusBadGateway, "bad gateway") }) } @@ -217,6 +228,14 @@ func requireStringField(t *testing.T, payload map[string]json.RawMessage, key, w } } +func requireMissingField(t *testing.T, payload map[string]json.RawMessage, key string) { + t.Helper() + + if _, ok := payload[key]; ok { + t.Fatalf("payload unexpectedly contains %q", key) + } +} + type pluginFunc func(context.Context, coreusage.Record) func (fn pluginFunc) HandleUsage(ctx context.Context, record coreusage.Record) { @@ -238,3 +257,22 @@ func requireBoolField(t *testing.T, payload map[string]json.RawMessage, key stri t.Fatalf("%s = %t, want %t", key, got, want) } } + +func requireFailField(t *testing.T, payload map[string]json.RawMessage, wantStatus int, wantBody string) { + t.Helper() + + raw, ok := payload["fail"] + if !ok { + t.Fatalf("payload missing %q", "fail") + } + var got struct { + StatusCode int `json:"status_code"` + Body string `json:"body"` + } + if err := json.Unmarshal(raw, &got); err != nil { + t.Fatalf("unmarshal fail: %v", err) + } + if got.StatusCode != wantStatus || got.Body != wantBody { + t.Fatalf("fail = {status_code:%d body:%q}, want {status_code:%d body:%q}", got.StatusCode, got.Body, wantStatus, wantBody) + } +} diff --git a/internal/runtime/executor/aistudio_executor.go b/internal/runtime/executor/aistudio_executor.go index 392109b5c..41365b5f7 100644 --- a/internal/runtime/executor/aistudio_executor.go +++ b/internal/runtime/executor/aistudio_executor.go @@ -284,7 +284,7 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth processEvent := func(event wsrelay.StreamEvent) bool { if event.Err != nil { helps.RecordAPIResponseError(ctx, e.cfg, event.Err) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, event.Err) select { case out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)}: case <-ctx.Done(): @@ -336,7 +336,7 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth return false case wsrelay.MessageTypeError: helps.RecordAPIResponseError(ctx, e.cfg, event.Err) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, event.Err) select { case out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)}: case <-ctx.Done(): diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index 84ff9de08..2f8dff927 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -898,7 +898,7 @@ attemptLoop: } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, errScan) out <- cliproxyexecutor.StreamChunk{Err: errScan} } else { reporter.EnsurePublished(ctx) @@ -1374,7 +1374,7 @@ attemptLoop: } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, errScan) select { case out <- cliproxyexecutor.StreamChunk{Err: errScan}: case <-ctx.Done(): diff --git a/internal/runtime/executor/claude_executor.go b/internal/runtime/executor/claude_executor.go index fe4f22f2e..eb17864d6 100644 --- a/internal/runtime/executor/claude_executor.go +++ b/internal/runtime/executor/claude_executor.go @@ -472,7 +472,7 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, errScan) select { case out <- cliproxyexecutor.StreamChunk{Err: errScan}: case <-ctx.Done(): @@ -512,7 +512,7 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, errScan) select { case out <- cliproxyexecutor.StreamChunk{Err: errScan}: case <-ctx.Done(): diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index 36c041b6e..a1bbe6b84 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -524,7 +524,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, errScan) select { case out <- cliproxyexecutor.StreamChunk{Err: errScan}: case <-ctx.Done(): diff --git a/internal/runtime/executor/codex_websockets_executor.go b/internal/runtime/executor/codex_websockets_executor.go index 86078aacc..2b56f13b1 100644 --- a/internal/runtime/executor/codex_websockets_executor.go +++ b/internal/runtime/executor/codex_websockets_executor.go @@ -580,7 +580,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr terminateReason = "read_error" terminateErr = errRead helps.RecordAPIWebsocketError(ctx, e.cfg, "read", errRead) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, errRead) _ = send(cliproxyexecutor.StreamChunk{Err: errRead}) return } @@ -590,7 +590,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr terminateReason = "unexpected_binary" terminateErr = err helps.RecordAPIWebsocketError(ctx, e.cfg, "unexpected_binary", err) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, err) if sess != nil { e.invalidateUpstreamConn(sess, conn, "unexpected_binary", err) } @@ -610,7 +610,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr terminateReason = "upstream_error" terminateErr = wsErr helps.RecordAPIWebsocketError(ctx, e.cfg, "upstream_error", wsErr) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, wsErr) if sess != nil { e.invalidateUpstreamConn(sess, conn, "upstream_error", wsErr) } diff --git a/internal/runtime/executor/gemini_cli_executor.go b/internal/runtime/executor/gemini_cli_executor.go index 0fa7cbb2d..a298fe8a0 100644 --- a/internal/runtime/executor/gemini_cli_executor.go +++ b/internal/runtime/executor/gemini_cli_executor.go @@ -430,7 +430,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, errScan) select { case out <- cliproxyexecutor.StreamChunk{Err: errScan}: case <-ctx.Done(): @@ -444,7 +444,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut data, errRead := io.ReadAll(resp.Body) if errRead != nil { helps.RecordAPIResponseError(ctx, e.cfg, errRead) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, errRead) select { case out <- cliproxyexecutor.StreamChunk{Err: errRead}: case <-ctx.Done(): diff --git a/internal/runtime/executor/gemini_executor.go b/internal/runtime/executor/gemini_executor.go index c3f080107..e8fa2e405 100644 --- a/internal/runtime/executor/gemini_executor.go +++ b/internal/runtime/executor/gemini_executor.go @@ -341,7 +341,7 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, errScan) select { case out <- cliproxyexecutor.StreamChunk{Err: errScan}: case <-ctx.Done(): diff --git a/internal/runtime/executor/gemini_vertex_executor.go b/internal/runtime/executor/gemini_vertex_executor.go index ae0a718b8..b899524c6 100644 --- a/internal/runtime/executor/gemini_vertex_executor.go +++ b/internal/runtime/executor/gemini_vertex_executor.go @@ -679,7 +679,7 @@ func (e *GeminiVertexExecutor) executeStreamWithServiceAccount(ctx context.Conte } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, errScan) select { case out <- cliproxyexecutor.StreamChunk{Err: errScan}: case <-ctx.Done(): @@ -821,7 +821,7 @@ func (e *GeminiVertexExecutor) executeStreamWithAPIKey(ctx context.Context, auth } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, errScan) select { case out <- cliproxyexecutor.StreamChunk{Err: errScan}: case <-ctx.Done(): diff --git a/internal/runtime/executor/helps/usage_helpers.go b/internal/runtime/executor/helps/usage_helpers.go index c72b5c1ae..bef0b4eab 100644 --- a/internal/runtime/executor/helps/usage_helpers.go +++ b/internal/runtime/executor/helps/usage_helpers.go @@ -3,6 +3,7 @@ package helps import ( "bytes" "context" + "errors" "fmt" "strings" "sync" @@ -51,7 +52,7 @@ func NewUsageReporter(ctx context.Context, provider, model string, auth *cliprox } func (r *UsageReporter) Publish(ctx context.Context, detail usage.Detail) { - r.publishWithOutcome(ctx, detail, false) + r.publishWithOutcome(ctx, detail, false, usage.Failure{}) } func (r *UsageReporter) PublishAdditionalModel(ctx context.Context, model string, detail usage.Detail) { @@ -74,11 +75,11 @@ func (r *UsageReporter) buildAdditionalModelRecord(model string, detail usage.De if !hasNonZeroTokenUsage(detail) { return usage.Record{}, false } - return r.buildRecordForModel(model, detail, false), true + return r.buildRecordForModel(model, detail, false, usage.Failure{}), true } -func (r *UsageReporter) PublishFailure(ctx context.Context) { - r.publishWithOutcome(ctx, usage.Detail{}, true) +func (r *UsageReporter) PublishFailure(ctx context.Context, errs ...error) { + r.publishWithOutcome(ctx, usage.Detail{}, true, failFromErrors(errs...)) } func (r *UsageReporter) TrackFailure(ctx context.Context, errPtr *error) { @@ -86,17 +87,17 @@ func (r *UsageReporter) TrackFailure(ctx context.Context, errPtr *error) { return } if *errPtr != nil { - r.PublishFailure(ctx) + r.PublishFailure(ctx, *errPtr) } } -func (r *UsageReporter) publishWithOutcome(ctx context.Context, detail usage.Detail, failed bool) { +func (r *UsageReporter) publishWithOutcome(ctx context.Context, detail usage.Detail, failed bool, fail usage.Failure) { if r == nil { return } detail = normalizeUsageDetailTotal(detail) r.once.Do(func() { - usage.PublishRecord(ctx, r.buildRecord(detail, failed)) + usage.PublishRecord(ctx, r.buildRecord(detail, failed, fail)) }) } @@ -127,20 +128,24 @@ func (r *UsageReporter) EnsurePublished(ctx context.Context) { return } r.once.Do(func() { - usage.PublishRecord(ctx, r.buildRecord(usage.Detail{}, false)) + usage.PublishRecord(ctx, r.buildRecord(usage.Detail{}, false, usage.Failure{})) }) } -func (r *UsageReporter) buildRecord(detail usage.Detail, failed bool) usage.Record { - if r == nil { - return usage.Record{Detail: detail, Failed: failed} +func (r *UsageReporter) buildRecord(detail usage.Detail, failed bool, failures ...usage.Failure) usage.Record { + var fail usage.Failure + if len(failures) > 0 { + fail = failures[0] } - return r.buildRecordForModel(r.model, detail, failed) + if r == nil { + return usage.Record{Detail: detail, Failed: failed, Fail: fail} + } + return r.buildRecordForModel(r.model, detail, failed, fail) } -func (r *UsageReporter) buildRecordForModel(model string, detail usage.Detail, failed bool) usage.Record { +func (r *UsageReporter) buildRecordForModel(model string, detail usage.Detail, failed bool, fail usage.Failure) usage.Record { if r == nil { - return usage.Record{Model: model, Detail: detail, Failed: failed} + return usage.Record{Model: model, Detail: detail, Failed: failed, Fail: fail} } return usage.Record{ Provider: r.provider, @@ -154,10 +159,28 @@ func (r *UsageReporter) buildRecordForModel(model string, detail usage.Detail, f RequestedAt: r.requestedAt, Latency: r.latency(), Failed: failed, + Fail: fail, Detail: detail, } } +func failFromErrors(errs ...error) usage.Failure { + for _, err := range errs { + if err == nil { + continue + } + fail := usage.Failure{ + Body: strings.TrimSpace(err.Error()), + } + var se interface{ StatusCode() int } + if errors.As(err, &se) && se != nil { + fail.StatusCode = se.StatusCode() + } + return fail + } + return usage.Failure{} +} + func (r *UsageReporter) latency() time.Duration { if r == nil || r.requestedAt.IsZero() { return 0 diff --git a/internal/runtime/executor/kimi_executor.go b/internal/runtime/executor/kimi_executor.go index f330321fa..6cfaec205 100644 --- a/internal/runtime/executor/kimi_executor.go +++ b/internal/runtime/executor/kimi_executor.go @@ -307,7 +307,7 @@ func (e *KimiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, errScan) select { case out <- cliproxyexecutor.StreamChunk{Err: errScan}: case <-ctx.Done(): diff --git a/internal/runtime/executor/openai_compat_executor.go b/internal/runtime/executor/openai_compat_executor.go index de12da370..82fc9e97d 100644 --- a/internal/runtime/executor/openai_compat_executor.go +++ b/internal/runtime/executor/openai_compat_executor.go @@ -296,7 +296,7 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy if bytes.HasPrefix(trimmedLine, []byte("{")) || bytes.HasPrefix(trimmedLine, []byte("[")) { streamErr := statusErr{code: http.StatusBadGateway, msg: string(trimmedLine)} helps.RecordAPIResponseError(ctx, e.cfg, streamErr) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, streamErr) select { case out <- cliproxyexecutor.StreamChunk{Err: streamErr}: case <-ctx.Done(): @@ -318,7 +318,7 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) - reporter.PublishFailure(ctx) + reporter.PublishFailure(ctx, errScan) select { case out <- cliproxyexecutor.StreamChunk{Err: errScan}: case <-ctx.Done(): diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index befdfe2cb..d339f56b3 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -51,6 +51,7 @@ type ExecutionSessionCloser interface { } const ( + homeAuthCountMetadataKey = "__cliproxy_home_auth_count" // CloseAllExecutionSessionsID asks an executor to release all active execution sessions. // Executors that do not support this marker may ignore it. CloseAllExecutionSessionsID = "__all_execution_sessions__" @@ -1316,19 +1317,25 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req } routeModel := req.Model opts = ensureRequestedModelMetadata(opts, routeModel) + homeMode := m.HomeEnabled() + homeAuthCount := 1 tried := make(map[string]struct{}) attempted := make(map[string]struct{}) var lastErr error for { - if maxRetryCredentials > 0 && len(attempted) >= maxRetryCredentials { + if !homeMode && maxRetryCredentials > 0 && len(attempted) >= maxRetryCredentials { if lastErr != nil { return cliproxyexecutor.Response{}, lastErr } return cliproxyexecutor.Response{}, &Error{Code: "auth_not_found", Message: "no auth available"} } - auth, executor, provider, errPick := m.pickNextMixed(ctx, providers, routeModel, opts, tried) + pickOpts := opts + if homeMode { + pickOpts = withHomeAuthCount(opts, homeAuthCount) + } + auth, executor, provider, errPick := m.pickNextMixed(ctx, providers, routeModel, pickOpts, tried) if errPick != nil { - if lastErr != nil { + if lastErr != nil && !homeMode { return cliproxyexecutor.Response{}, lastErr } return cliproxyexecutor.Response{}, errPick @@ -1384,6 +1391,9 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req return cliproxyexecutor.Response{}, authErr } lastErr = authErr + if homeMode { + homeAuthCount++ + } continue } } @@ -1395,19 +1405,25 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string, } routeModel := req.Model opts = ensureRequestedModelMetadata(opts, routeModel) + homeMode := m.HomeEnabled() + homeAuthCount := 1 tried := make(map[string]struct{}) attempted := make(map[string]struct{}) var lastErr error for { - if maxRetryCredentials > 0 && len(attempted) >= maxRetryCredentials { + if !homeMode && maxRetryCredentials > 0 && len(attempted) >= maxRetryCredentials { if lastErr != nil { return cliproxyexecutor.Response{}, lastErr } return cliproxyexecutor.Response{}, &Error{Code: "auth_not_found", Message: "no auth available"} } - auth, executor, provider, errPick := m.pickNextMixed(ctx, providers, routeModel, opts, tried) + pickOpts := opts + if homeMode { + pickOpts = withHomeAuthCount(opts, homeAuthCount) + } + auth, executor, provider, errPick := m.pickNextMixed(ctx, providers, routeModel, pickOpts, tried) if errPick != nil { - if lastErr != nil { + if lastErr != nil && !homeMode { return cliproxyexecutor.Response{}, lastErr } return cliproxyexecutor.Response{}, errPick @@ -1463,6 +1479,9 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string, return cliproxyexecutor.Response{}, authErr } lastErr = authErr + if homeMode { + homeAuthCount++ + } continue } } @@ -1474,19 +1493,25 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string } routeModel := req.Model opts = ensureRequestedModelMetadata(opts, routeModel) + homeMode := m.HomeEnabled() + homeAuthCount := 1 tried := make(map[string]struct{}) attempted := make(map[string]struct{}) var lastErr error for { - if maxRetryCredentials > 0 && len(attempted) >= maxRetryCredentials { + if !homeMode && maxRetryCredentials > 0 && len(attempted) >= maxRetryCredentials { if lastErr != nil { return nil, lastErr } return nil, &Error{Code: "auth_not_found", Message: "no auth available"} } - auth, executor, provider, errPick := m.pickNextMixed(ctx, providers, routeModel, opts, tried) + pickOpts := opts + if homeMode { + pickOpts = withHomeAuthCount(opts, homeAuthCount) + } + auth, executor, provider, errPick := m.pickNextMixed(ctx, providers, routeModel, pickOpts, tried) if errPick != nil { - if lastErr != nil { + if lastErr != nil && !homeMode { return nil, lastErr } return nil, errPick @@ -1516,6 +1541,9 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string return nil, errStream } lastErr = errStream + if homeMode { + homeAuthCount++ + } continue } return streamResult, nil @@ -1543,6 +1571,40 @@ func ensureRequestedModelMetadata(opts cliproxyexecutor.Options, requestedModel return opts } +func withHomeAuthCount(opts cliproxyexecutor.Options, count int) cliproxyexecutor.Options { + if count <= 0 { + count = 1 + } + meta := make(map[string]any, len(opts.Metadata)+1) + for k, v := range opts.Metadata { + meta[k] = v + } + meta[homeAuthCountMetadataKey] = count + opts.Metadata = meta + return opts +} + +func homeAuthCountFromMetadata(meta map[string]any) int { + if len(meta) == 0 { + return 1 + } + switch value := meta[homeAuthCountMetadataKey].(type) { + case int: + if value > 0 { + return value + } + case int64: + if value > 0 { + return int(value) + } + case float64: + if value > 0 { + return int(value) + } + } + return 1 +} + func hasRequestedModelMetadata(meta map[string]any) bool { if len(meta) == 0 { return false @@ -3099,8 +3161,9 @@ func (m *Manager) pickNextViaHome(ctx context.Context, model string, opts clipro requestedModel := requestedModelFromMetadata(opts.Metadata, model) sessionID := ExtractSessionID(opts.Headers, opts.OriginalRequest, opts.Metadata) + count := homeAuthCountFromMetadata(opts.Metadata) - raw, err := client.RPopAuth(ctx, requestedModel, sessionID, opts.Headers) + raw, err := client.RPopAuth(ctx, requestedModel, sessionID, opts.Headers, count) if err != nil { return nil, nil, "", &Error{Code: "auth_not_found", Message: err.Error(), HTTPStatus: http.StatusServiceUnavailable} } diff --git a/sdk/cliproxy/service.go b/sdk/cliproxy/service.go index 6a94878de..89a480b50 100644 --- a/sdk/cliproxy/service.go +++ b/sdk/cliproxy/service.go @@ -561,6 +561,7 @@ func forceHomeRuntimeConfig(cfg *config.Config) { return } cfg.APIKeys = nil + cfg.UsageStatisticsEnabled = true cfg.DisableCooling = true cfg.WebsocketAuth = false cfg.EnableGeminiCLIEndpoint = false @@ -732,6 +733,7 @@ func (s *Service) Run(ctx context.Context) error { homeEnabled := s.cfg != nil && s.cfg.Home.Enabled if homeEnabled { forceHomeRuntimeConfig(s.cfg) + redisqueue.SetUsageStatisticsEnabled(true) } shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) diff --git a/sdk/cliproxy/service_stale_state_test.go b/sdk/cliproxy/service_stale_state_test.go index 8943d6793..53849eb34 100644 --- a/sdk/cliproxy/service_stale_state_test.go +++ b/sdk/cliproxy/service_stale_state_test.go @@ -99,3 +99,32 @@ func TestServiceApplyCoreAuthAddOrUpdate_DeleteReAddDoesNotInheritStaleRuntimeSt t.Fatalf("expected re-added auth to re-register models in global registry") } } + +func TestForceHomeRuntimeConfigEnablesUsageStatistics(t *testing.T) { + cfg := &config.Config{ + UsageStatisticsEnabled: false, + } + + forceHomeRuntimeConfig(cfg) + + if !cfg.UsageStatisticsEnabled { + t.Fatal("expected home runtime config to force usage statistics enabled") + } +} + +func TestApplyHomeOverlayForcesUsageStatisticsEnabled(t *testing.T) { + baseCfg := &config.Config{} + baseCfg.Home.Enabled = true + service := &Service{cfg: baseCfg} + + service.applyHomeOverlay(&config.Config{ + UsageStatisticsEnabled: false, + }) + + if service.cfg == nil || !service.cfg.UsageStatisticsEnabled { + t.Fatal("expected home overlay to force usage statistics enabled") + } + if !service.cfg.Home.Enabled { + t.Fatal("expected home overlay to preserve local home settings") + } +} diff --git a/sdk/cliproxy/usage/manager.go b/sdk/cliproxy/usage/manager.go index 72405d758..2305d9a48 100644 --- a/sdk/cliproxy/usage/manager.go +++ b/sdk/cliproxy/usage/manager.go @@ -22,9 +22,16 @@ type Record struct { RequestedAt time.Time Latency time.Duration Failed bool + Fail Failure Detail Detail } +// Failure holds HTTP failure metadata for an upstream request attempt. +type Failure struct { + StatusCode int + Body string +} + // Detail holds the token usage breakdown. type Detail struct { InputTokens int64