diff --git a/internal/logging/requestmeta.go b/internal/logging/requestmeta.go new file mode 100644 index 000000000..a28d7c628 --- /dev/null +++ b/internal/logging/requestmeta.go @@ -0,0 +1,62 @@ +package logging + +import ( + "context" + "sync/atomic" +) + +type endpointKey struct{} +type responseStatusKey struct{} + +type responseStatusHolder struct { + status atomic.Int32 +} + +func WithEndpoint(ctx context.Context, endpoint string) context.Context { + if ctx == nil { + ctx = context.Background() + } + return context.WithValue(ctx, endpointKey{}, endpoint) +} + +func GetEndpoint(ctx context.Context) string { + if ctx == nil { + return "" + } + if endpoint, ok := ctx.Value(endpointKey{}).(string); ok { + return endpoint + } + return "" +} + +func WithResponseStatusHolder(ctx context.Context) context.Context { + if ctx == nil { + ctx = context.Background() + } + if holder, ok := ctx.Value(responseStatusKey{}).(*responseStatusHolder); ok && holder != nil { + return ctx + } + return context.WithValue(ctx, responseStatusKey{}, &responseStatusHolder{}) +} + +func SetResponseStatus(ctx context.Context, status int) { + if ctx == nil || status <= 0 { + return + } + holder, ok := ctx.Value(responseStatusKey{}).(*responseStatusHolder) + if !ok || holder == nil { + return + } + holder.status.Store(int32(status)) +} + +func GetResponseStatus(ctx context.Context) int { + if ctx == nil { + return 0 + } + holder, ok := ctx.Value(responseStatusKey{}).(*responseStatusHolder) + if !ok || holder == nil { + return 0 + } + return int(holder.status.Load()) +} diff --git a/internal/redisqueue/plugin.go b/internal/redisqueue/plugin.go index a805e5dad..39739dbe4 100644 --- a/internal/redisqueue/plugin.go +++ b/internal/redisqueue/plugin.go @@ -3,11 +3,9 @@ package redisqueue import ( "context" "encoding/json" - "net/http" "strings" "time" - "github.com/gin-gonic/gin" internallogging "github.com/router-for-me/CLIProxyAPI/v6/internal/logging" internalusage "github.com/router-for-me/CLIProxyAPI/v6/internal/usage" coreusage "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/usage" @@ -46,11 +44,6 @@ func (p *usageQueuePlugin) HandleUsage(ctx context.Context, record coreusage.Rec } apiKey := strings.TrimSpace(record.APIKey) requestID := strings.TrimSpace(internallogging.GetRequestID(ctx)) - if requestID == "" { - if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil { - requestID = strings.TrimSpace(internallogging.GetGinRequestID(ginCtx)) - } - } tokens := internalusage.TokenStats{ InputTokens: record.Detail.InputTokens, @@ -106,40 +99,15 @@ type queuedUsageDetail struct { } func resolveSuccess(ctx context.Context) bool { - if ctx == nil { - return true - } - ginCtx, ok := ctx.Value("gin").(*gin.Context) - if !ok || ginCtx == nil { - return true - } - status := ginCtx.Writer.Status() + status := internallogging.GetResponseStatus(ctx) if status == 0 { return true } - return status < http.StatusBadRequest + return status < httpStatusBadRequest } func resolveEndpoint(ctx context.Context) string { - if ctx == nil { - return "" - } - ginCtx, ok := ctx.Value("gin").(*gin.Context) - if !ok || ginCtx == nil || ginCtx.Request == nil { - return "" - } - - path := strings.TrimSpace(ginCtx.FullPath()) - if path == "" && ginCtx.Request.URL != nil { - path = strings.TrimSpace(ginCtx.Request.URL.Path) - } - if path == "" { - return "" - } - - method := strings.TrimSpace(ginCtx.Request.Method) - if method == "" { - return path - } - return method + " " + path + return strings.TrimSpace(internallogging.GetEndpoint(ctx)) } + +const httpStatusBadRequest = 400 diff --git a/internal/redisqueue/plugin_test.go b/internal/redisqueue/plugin_test.go index 907b8aeeb..1e8bda482 100644 --- a/internal/redisqueue/plugin_test.go +++ b/internal/redisqueue/plugin_test.go @@ -16,9 +16,10 @@ import ( func TestUsageQueuePluginPayloadIncludesStableFieldsAndSuccess(t *testing.T) { withEnabledQueue(t, func() { - ginCtx := newTestGinContext(t, http.MethodPost, "/v1/chat/completions", http.StatusOK) - internallogging.SetGinRequestID(ginCtx, "gin-request-id-ignored") - ctx := context.WithValue(internallogging.WithRequestID(context.Background(), "ctx-request-id"), "gin", ginCtx) + ctx := internallogging.WithRequestID(context.Background(), "ctx-request-id") + ctx = internallogging.WithEndpoint(ctx, "POST /v1/chat/completions") + ctx = internallogging.WithResponseStatusHolder(ctx) + internallogging.SetResponseStatus(ctx, http.StatusOK) plugin := &usageQueuePlugin{} plugin.HandleUsage(ctx, coreusage.Record{ @@ -49,9 +50,10 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndSuccess(t *testing.T) { func TestUsageQueuePluginPayloadIncludesStableFieldsAndFailureAndGinRequestID(t *testing.T) { withEnabledQueue(t, func() { - ginCtx := newTestGinContext(t, http.MethodGet, "/v1/responses", http.StatusInternalServerError) - internallogging.SetGinRequestID(ginCtx, "gin-request-id") - ctx := context.WithValue(context.Background(), "gin", ginCtx) + ctx := internallogging.WithRequestID(context.Background(), "gin-request-id") + ctx = internallogging.WithEndpoint(ctx, "GET /v1/responses") + ctx = internallogging.WithResponseStatusHolder(ctx) + internallogging.SetResponseStatus(ctx, http.StatusInternalServerError) plugin := &usageQueuePlugin{} plugin.HandleUsage(ctx, coreusage.Record{ @@ -80,6 +82,47 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndFailureAndGinRequestID(t }) } +func TestUsageQueuePluginAsyncIgnoresRecycledGinContext(t *testing.T) { + withEnabledQueue(t, func() { + ginCtx := newTestGinContext(t, http.MethodPost, "/v1/chat/completions", http.StatusOK) + ctx := context.WithValue(context.Background(), "gin", ginCtx) + ctx = internallogging.WithRequestID(ctx, "ctx-request-id") + ctx = internallogging.WithEndpoint(ctx, "POST /v1/chat/completions") + ctx = internallogging.WithResponseStatusHolder(ctx) + internallogging.SetResponseStatus(ctx, http.StatusInternalServerError) + + mgr := coreusage.NewManager(16) + defer mgr.Stop() + + mgr.Register(pluginFunc(func(_ context.Context, _ coreusage.Record) { + ginCtx.Request = httptest.NewRequest(http.MethodGet, "http://example.com/v1/responses", nil) + ginCtx.Status(http.StatusOK) + })) + mgr.Register(&usageQueuePlugin{}) + + mgr.Publish(ctx, coreusage.Record{ + Provider: "openai", + Model: "gpt-5.4", + APIKey: "test-key", + AuthIndex: "0", + AuthType: "apikey", + Source: "user@example.com", + RequestedAt: time.Date(2026, 4, 25, 0, 0, 0, 0, time.UTC), + Latency: 1500 * time.Millisecond, + Detail: coreusage.Detail{ + InputTokens: 10, + OutputTokens: 20, + TotalTokens: 30, + }, + }) + + payload := waitForSinglePayload(t, 2*time.Second) + requireStringField(t, payload, "endpoint", "POST /v1/chat/completions") + requireStringField(t, payload, "request_id", "ctx-request-id") + requireBoolField(t, payload, "failed", true) + }) +} + func withEnabledQueue(t *testing.T, fn func()) { t.Helper() @@ -127,6 +170,29 @@ func popSinglePayload(t *testing.T) map[string]json.RawMessage { return payload } +func waitForSinglePayload(t *testing.T, timeout time.Duration) map[string]json.RawMessage { + t.Helper() + + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + items := PopOldest(10) + if len(items) == 0 { + time.Sleep(10 * time.Millisecond) + continue + } + if len(items) != 1 { + t.Fatalf("PopOldest() items = %d, want 1", len(items)) + } + var payload map[string]json.RawMessage + if err := json.Unmarshal(items[0], &payload); err != nil { + t.Fatalf("unmarshal payload: %v", err) + } + return payload + } + t.Fatalf("timeout waiting for queued payload") + return nil +} + func requireStringField(t *testing.T, payload map[string]json.RawMessage, key, want string) { t.Helper() @@ -143,6 +209,12 @@ func requireStringField(t *testing.T, payload map[string]json.RawMessage, key, w } } +type pluginFunc func(context.Context, coreusage.Record) + +func (fn pluginFunc) HandleUsage(ctx context.Context, record coreusage.Record) { + fn(ctx, record) +} + func requireBoolField(t *testing.T, payload map[string]json.RawMessage, key string, want bool) { t.Helper() diff --git a/internal/usage/logger_plugin.go b/internal/usage/logger_plugin.go index 803d005ee..9d59de4fe 100644 --- a/internal/usage/logger_plugin.go +++ b/internal/usage/logger_plugin.go @@ -11,7 +11,7 @@ import ( "sync/atomic" "time" - "github.com/gin-gonic/gin" + internallogging "github.com/router-for-me/CLIProxyAPI/v6/internal/logging" coreusage "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/usage" ) @@ -401,21 +401,8 @@ func dedupKey(apiName, modelName string, detail RequestDetail) string { func resolveAPIIdentifier(ctx context.Context, record coreusage.Record) string { if ctx != nil { - if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil { - path := ginCtx.FullPath() - if path == "" && ginCtx.Request != nil { - path = ginCtx.Request.URL.Path - } - method := "" - if ginCtx.Request != nil { - method = ginCtx.Request.Method - } - if path != "" { - if method != "" { - return method + " " + path - } - return path - } + if endpoint := strings.TrimSpace(internallogging.GetEndpoint(ctx)); endpoint != "" { + return endpoint } } if record.Provider != "" { @@ -425,14 +412,7 @@ func resolveAPIIdentifier(ctx context.Context, record coreusage.Record) string { } func resolveSuccess(ctx context.Context) bool { - if ctx == nil { - return true - } - ginCtx, ok := ctx.Value("gin").(*gin.Context) - if !ok || ginCtx == nil { - return true - } - status := ginCtx.Writer.Status() + status := internallogging.GetResponseStatus(ctx) if status == 0 { return true } diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 22f7c41a1..52b2a4fde 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -375,11 +375,32 @@ func (h *BaseAPIHandler) GetContextWithCancel(handler interfaces.APIHandler, c * if requestCtx != nil && logging.GetRequestID(parentCtx) == "" { if requestID := logging.GetRequestID(requestCtx); requestID != "" { parentCtx = logging.WithRequestID(parentCtx, requestID) - } else if requestID := logging.GetGinRequestID(c); requestID != "" { + } else if requestID = logging.GetGinRequestID(c); requestID != "" { parentCtx = logging.WithRequestID(parentCtx, requestID) } } newCtx, cancel := context.WithCancel(parentCtx) + + endpoint := "" + if c != nil && c.Request != nil { + path := strings.TrimSpace(c.FullPath()) + if path == "" && c.Request.URL != nil { + path = strings.TrimSpace(c.Request.URL.Path) + } + if path != "" { + method := strings.TrimSpace(c.Request.Method) + if method != "" { + endpoint = method + " " + path + } else { + endpoint = path + } + } + } + if endpoint != "" { + newCtx = logging.WithEndpoint(newCtx, endpoint) + } + newCtx = logging.WithResponseStatusHolder(newCtx) + cancelCtx := newCtx if requestCtx != nil && requestCtx != parentCtx { go func() { @@ -393,6 +414,9 @@ func (h *BaseAPIHandler) GetContextWithCancel(handler interfaces.APIHandler, c * newCtx = context.WithValue(newCtx, "gin", c) newCtx = context.WithValue(newCtx, "handler", handler) return newCtx, func(params ...interface{}) { + if c != nil { + logging.SetResponseStatus(cancelCtx, c.Writer.Status()) + } if h.Cfg.RequestLog && len(params) == 1 { if existing, exists := c.Get("API_RESPONSE"); exists { if existingBytes, ok := existing.([]byte); ok && len(bytes.TrimSpace(existingBytes)) > 0 {