diff --git a/internal/logging/requestmeta.go b/internal/logging/requestmeta.go index a28d7c628..c7479dd9e 100644 --- a/internal/logging/requestmeta.go +++ b/internal/logging/requestmeta.go @@ -2,16 +2,24 @@ package logging import ( "context" + "net/http" + "sync" "sync/atomic" ) type endpointKey struct{} type responseStatusKey struct{} +type responseHeadersKey struct{} type responseStatusHolder struct { status atomic.Int32 } +type responseHeadersHolder struct { + mu sync.RWMutex + headers http.Header +} + func WithEndpoint(ctx context.Context, endpoint string) context.Context { if ctx == nil { ctx = context.Background() @@ -39,6 +47,16 @@ func WithResponseStatusHolder(ctx context.Context) context.Context { return context.WithValue(ctx, responseStatusKey{}, &responseStatusHolder{}) } +func WithResponseHeadersHolder(ctx context.Context) context.Context { + if ctx == nil { + ctx = context.Background() + } + if holder, ok := ctx.Value(responseHeadersKey{}).(*responseHeadersHolder); ok && holder != nil { + return ctx + } + return context.WithValue(ctx, responseHeadersKey{}, &responseHeadersHolder{}) +} + func SetResponseStatus(ctx context.Context, status int) { if ctx == nil || status <= 0 { return @@ -50,6 +68,19 @@ func SetResponseStatus(ctx context.Context, status int) { holder.status.Store(int32(status)) } +func SetResponseHeaders(ctx context.Context, headers http.Header) { + if ctx == nil { + return + } + holder, ok := ctx.Value(responseHeadersKey{}).(*responseHeadersHolder) + if !ok || holder == nil { + return + } + holder.mu.Lock() + defer holder.mu.Unlock() + holder.headers = cloneHTTPHeader(headers) +} + func GetResponseStatus(ctx context.Context) int { if ctx == nil { return 0 @@ -60,3 +91,27 @@ func GetResponseStatus(ctx context.Context) int { } return int(holder.status.Load()) } + +func GetResponseHeaders(ctx context.Context) http.Header { + if ctx == nil { + return nil + } + holder, ok := ctx.Value(responseHeadersKey{}).(*responseHeadersHolder) + if !ok || holder == nil { + return nil + } + holder.mu.RLock() + defer holder.mu.RUnlock() + return cloneHTTPHeader(holder.headers) +} + +func cloneHTTPHeader(src http.Header) http.Header { + if len(src) == 0 { + return nil + } + dst := make(http.Header, len(src)) + for key, values := range src { + dst[key] = append([]string(nil), values...) + } + return dst +} diff --git a/internal/redisqueue/plugin.go b/internal/redisqueue/plugin.go index 057052d14..158b5ed5e 100644 --- a/internal/redisqueue/plugin.go +++ b/internal/redisqueue/plugin.go @@ -3,6 +3,7 @@ package redisqueue import ( "context" "encoding/json" + "net/http" "strings" "time" @@ -71,13 +72,14 @@ func (p *usageQueuePlugin) HandleUsage(ctx context.Context, record coreusage.Rec fail := resolveFail(ctx, record, failed) detail := requestDetail{ - Timestamp: timestamp, - LatencyMs: record.Latency.Milliseconds(), - Source: record.Source, - AuthIndex: record.AuthIndex, - Tokens: tokens, - Failed: failed, - Fail: fail, + Timestamp: timestamp, + LatencyMs: record.Latency.Milliseconds(), + Source: record.Source, + AuthIndex: record.AuthIndex, + Tokens: tokens, + Failed: failed, + Fail: fail, + ResponseHeaders: record.ResponseHeaders, } payload, err := json.Marshal(queuedUsageDetail{ @@ -108,13 +110,14 @@ type queuedUsageDetail struct { } type requestDetail struct { - Timestamp time.Time `json:"timestamp"` - LatencyMs int64 `json:"latency_ms"` - Source string `json:"source"` - AuthIndex string `json:"auth_index"` - Tokens tokenStats `json:"tokens"` - Failed bool `json:"failed"` - Fail failDetail `json:"fail"` + Timestamp time.Time `json:"timestamp"` + LatencyMs int64 `json:"latency_ms"` + Source string `json:"source"` + AuthIndex string `json:"auth_index"` + Tokens tokenStats `json:"tokens"` + Failed bool `json:"failed"` + Fail failDetail `json:"fail"` + ResponseHeaders http.Header `json:"response_headers,omitempty"` } type tokenStats struct { diff --git a/internal/redisqueue/plugin_test.go b/internal/redisqueue/plugin_test.go index e2af6af70..a3358d163 100644 --- a/internal/redisqueue/plugin_test.go +++ b/internal/redisqueue/plugin_test.go @@ -19,6 +19,9 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndSuccess(t *testing.T) { ctx = internallogging.WithEndpoint(ctx, "POST /v1/chat/completions") ctx = internallogging.WithResponseStatusHolder(ctx) internallogging.SetResponseStatus(ctx, http.StatusOK) + responseHeaders := http.Header{} + responseHeaders.Add("X-Upstream-Request-Id", "upstream-req-1") + responseHeaders.Add("Retry-After", "30") plugin := &usageQueuePlugin{} plugin.HandleUsage(ctx, coreusage.Record{ @@ -36,7 +39,9 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndSuccess(t *testing.T) { OutputTokens: 20, TotalTokens: 30, }, + ResponseHeaders: responseHeaders.Clone(), }) + responseHeaders.Set("Retry-After", "999") payload := popSinglePayload(t) requireStringField(t, payload, "provider", "openai") @@ -46,11 +51,57 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndSuccess(t *testing.T) { requireStringField(t, payload, "auth_type", "apikey") requireMissingField(t, payload, "user_api_key") requireStringField(t, payload, "request_id", "ctx-request-id") + requireHeaderField(t, payload, "response_headers", "X-Upstream-Request-Id", []string{"upstream-req-1"}) + requireHeaderField(t, payload, "response_headers", "Retry-After", []string{"30"}) requireBoolField(t, payload, "failed", false) requireFailField(t, payload, http.StatusOK, "") }) } +func TestUsageQueuePluginAsyncUsesRecordResponseHeaders(t *testing.T) { + withEnabledQueue(t, func() { + ctx := internallogging.WithRequestID(context.Background(), "ctx-request-id") + ctx = internallogging.WithEndpoint(ctx, "POST /v1/chat/completions") + ctx = internallogging.WithResponseStatusHolder(ctx) + ctx = internallogging.WithResponseHeadersHolder(ctx) + internallogging.SetResponseStatus(ctx, http.StatusOK) + initialHeaders := http.Header{} + initialHeaders.Set("X-Upstream-Request-Id", "upstream-req-1") + internallogging.SetResponseHeaders(ctx, initialHeaders) + + mgr := coreusage.NewManager(16) + defer mgr.Stop() + + mgr.Register(pluginFunc(func(ctx context.Context, _ coreusage.Record) { + nextHeaders := http.Header{} + nextHeaders.Set("X-Upstream-Request-Id", "upstream-req-2") + internallogging.SetResponseHeaders(ctx, nextHeaders) + })) + mgr.Register(&usageQueuePlugin{}) + + mgr.Publish(ctx, coreusage.Record{ + Provider: "openai", + Model: "gpt-5.4", + Alias: "client-gpt", + APIKey: "test-key", + AuthIndex: "0", + AuthType: "apikey", + Source: "user@example.com", + RequestedAt: time.Date(2026, 4, 25, 0, 0, 0, 0, time.UTC), + Latency: 1500 * time.Millisecond, + Detail: coreusage.Detail{ + InputTokens: 10, + OutputTokens: 20, + TotalTokens: 30, + }, + ResponseHeaders: internallogging.GetResponseHeaders(ctx), + }) + + payload := waitForSinglePayload(t, 2*time.Second) + requireHeaderField(t, payload, "response_headers", "X-Upstream-Request-Id", []string{"upstream-req-1"}) + }) +} + func TestUsageQueuePluginPayloadIncludesStableFieldsAndFailureAndGinRequestID(t *testing.T) { withEnabledQueue(t, func() { ctx := internallogging.WithRequestID(context.Background(), "gin-request-id") @@ -276,3 +327,28 @@ func requireFailField(t *testing.T, payload map[string]json.RawMessage, wantStat t.Fatalf("fail = {status_code:%d body:%q}, want {status_code:%d body:%q}", got.StatusCode, got.Body, wantStatus, wantBody) } } + +func requireHeaderField(t *testing.T, payload map[string]json.RawMessage, field, key string, want []string) { + t.Helper() + + raw, ok := payload[field] + if !ok { + t.Fatalf("payload missing %q", field) + } + var headers map[string][]string + if err := json.Unmarshal(raw, &headers); err != nil { + t.Fatalf("unmarshal %q: %v", field, err) + } + got, ok := headers[key] + if !ok { + t.Fatalf("%s missing header %q", field, key) + } + if len(got) != len(want) { + t.Fatalf("%s[%q] = %v, want %v", field, key, got, want) + } + for i := range want { + if got[i] != want[i] { + t.Fatalf("%s[%q] = %v, want %v", field, key, got, want) + } + } +} diff --git a/internal/runtime/executor/helps/logging_helpers.go b/internal/runtime/executor/helps/logging_helpers.go index fa7143347..87fc7ac34 100644 --- a/internal/runtime/executor/helps/logging_helpers.go +++ b/internal/runtime/executor/helps/logging_helpers.go @@ -102,6 +102,7 @@ func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequ // RecordAPIResponseMetadata captures upstream response status/header information for the latest attempt. func RecordAPIResponseMetadata(ctx context.Context, cfg *config.Config, status int, headers http.Header) { + logging.SetResponseHeaders(ctx, headers) if cfg == nil || !cfg.RequestLog { return } @@ -227,6 +228,7 @@ func RecordAPIWebsocketRequest(ctx context.Context, cfg *config.Config, info Ups // RecordAPIWebsocketHandshake stores the upstream websocket handshake response metadata. func RecordAPIWebsocketHandshake(ctx context.Context, cfg *config.Config, status int, headers http.Header) { + logging.SetResponseHeaders(ctx, headers) if cfg == nil || !cfg.RequestLog { return } @@ -250,6 +252,7 @@ func RecordAPIWebsocketHandshake(ctx context.Context, cfg *config.Config, status // RecordAPIWebsocketUpgradeRejection stores a rejected websocket upgrade as an HTTP attempt. func RecordAPIWebsocketUpgradeRejection(ctx context.Context, cfg *config.Config, info UpstreamRequestLog, status int, headers http.Header, body []byte) { + logging.SetResponseHeaders(ctx, headers) if cfg == nil || !cfg.RequestLog { return } diff --git a/internal/runtime/executor/helps/logging_helpers_test.go b/internal/runtime/executor/helps/logging_helpers_test.go new file mode 100644 index 000000000..17ad24656 --- /dev/null +++ b/internal/runtime/executor/helps/logging_helpers_test.go @@ -0,0 +1,24 @@ +package helps + +import ( + "context" + "net/http" + "testing" + + "github.com/router-for-me/CLIProxyAPI/v7/internal/config" + "github.com/router-for-me/CLIProxyAPI/v7/internal/logging" +) + +func TestRecordAPIResponseMetadataStoresHeadersWhenRequestLogDisabled(t *testing.T) { + ctx := logging.WithResponseHeadersHolder(context.Background()) + headers := http.Header{} + headers.Add("X-Upstream-Request-Id", "upstream-req-1") + + RecordAPIResponseMetadata(ctx, &config.Config{}, http.StatusOK, headers) + headers.Set("X-Upstream-Request-Id", "mutated") + + got := logging.GetResponseHeaders(ctx) + if got.Get("X-Upstream-Request-Id") != "upstream-req-1" { + t.Fatalf("response header = %q, want %q", got.Get("X-Upstream-Request-Id"), "upstream-req-1") + } +} diff --git a/internal/runtime/executor/helps/usage_helpers.go b/internal/runtime/executor/helps/usage_helpers.go index a507a73e5..d711b91a7 100644 --- a/internal/runtime/executor/helps/usage_helpers.go +++ b/internal/runtime/executor/helps/usage_helpers.go @@ -10,6 +10,7 @@ import ( "time" "github.com/gin-gonic/gin" + internallogging "github.com/router-for-me/CLIProxyAPI/v7/internal/logging" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth" "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/usage" "github.com/tidwall/gjson" @@ -60,7 +61,7 @@ func (r *UsageReporter) PublishAdditionalModel(ctx context.Context, model string if !ok { return } - usage.PublishRecord(ctx, record) + r.publishRecord(ctx, record) } func (r *UsageReporter) buildAdditionalModelRecord(model string, detail usage.Detail) (usage.Record, bool) { @@ -97,7 +98,7 @@ func (r *UsageReporter) publishWithOutcome(ctx context.Context, detail usage.Det } detail = normalizeUsageDetailTotal(detail) r.once.Do(func() { - usage.PublishRecord(ctx, r.buildRecord(detail, failed, fail)) + r.publishRecord(ctx, r.buildRecord(detail, failed, fail)) }) } @@ -130,10 +131,15 @@ func (r *UsageReporter) EnsurePublished(ctx context.Context) { return } r.once.Do(func() { - usage.PublishRecord(ctx, r.buildRecord(usage.Detail{}, false, usage.Failure{})) + r.publishRecord(ctx, r.buildRecord(usage.Detail{}, false, usage.Failure{})) }) } +func (r *UsageReporter) publishRecord(ctx context.Context, record usage.Record) { + record.ResponseHeaders = internallogging.GetResponseHeaders(ctx) + usage.PublishRecord(ctx, record) +} + func (r *UsageReporter) buildRecord(detail usage.Detail, failed bool, failures ...usage.Failure) usage.Record { var fail usage.Failure if len(failures) > 0 { diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 6e0adb641..7c8416df4 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -400,6 +400,7 @@ func (h *BaseAPIHandler) GetContextWithCancel(handler interfaces.APIHandler, c * newCtx = logging.WithEndpoint(newCtx, endpoint) } newCtx = logging.WithResponseStatusHolder(newCtx) + newCtx = logging.WithResponseHeadersHolder(newCtx) cancelCtx := newCtx if requestCtx != nil && requestCtx != parentCtx { diff --git a/sdk/cliproxy/usage/manager.go b/sdk/cliproxy/usage/manager.go index 7bc73114e..2cdd34716 100644 --- a/sdk/cliproxy/usage/manager.go +++ b/sdk/cliproxy/usage/manager.go @@ -2,6 +2,7 @@ package usage import ( "context" + "net/http" "strings" "sync" "time" @@ -24,6 +25,8 @@ type Record struct { Failed bool Fail Failure Detail Detail + // ResponseHeaders stores a snapshot of upstream response headers for usage sinks. + ResponseHeaders http.Header } // Failure holds HTTP failure metadata for an upstream request attempt.