Files
CLIProxyAPI/internal/redisqueue/plugin_test.go
Luis Pater ad98c9549a feat(runtime): track upstream response headers in logging and usage reporting
- Added APIs to store, retrieve, and clone upstream response headers in context for detailed logging.
- Updated `RecordAPIResponseMetadata`, `RecordAPIWebsocketHandshake`, and related methods to capture response headers.
- Extended `UsageReporter` to include response headers in published usage records.
- Enhanced payload tests to validate response headers' integrity and persistence.
- Refactored `usage.Record` to support optional `ResponseHeaders` field.
2026-05-19 01:29:23 +08:00

355 lines
11 KiB
Go

package redisqueue
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/gin-gonic/gin"
internallogging "github.com/router-for-me/CLIProxyAPI/v7/internal/logging"
coreusage "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/usage"
)
func TestUsageQueuePluginPayloadIncludesStableFieldsAndSuccess(t *testing.T) {
withEnabledQueue(t, func() {
ctx := internallogging.WithRequestID(context.Background(), "ctx-request-id")
ctx = internallogging.WithEndpoint(ctx, "POST /v1/chat/completions")
ctx = internallogging.WithResponseStatusHolder(ctx)
internallogging.SetResponseStatus(ctx, http.StatusOK)
responseHeaders := http.Header{}
responseHeaders.Add("X-Upstream-Request-Id", "upstream-req-1")
responseHeaders.Add("Retry-After", "30")
plugin := &usageQueuePlugin{}
plugin.HandleUsage(ctx, coreusage.Record{
Provider: "openai",
Model: "gpt-5.4",
Alias: "client-gpt",
APIKey: "test-key",
AuthIndex: "0",
AuthType: "apikey",
Source: "user@example.com",
RequestedAt: time.Date(2026, 4, 25, 0, 0, 0, 0, time.UTC),
Latency: 1500 * time.Millisecond,
Detail: coreusage.Detail{
InputTokens: 10,
OutputTokens: 20,
TotalTokens: 30,
},
ResponseHeaders: responseHeaders.Clone(),
})
responseHeaders.Set("Retry-After", "999")
payload := popSinglePayload(t)
requireStringField(t, payload, "provider", "openai")
requireStringField(t, payload, "model", "gpt-5.4")
requireStringField(t, payload, "alias", "client-gpt")
requireStringField(t, payload, "endpoint", "POST /v1/chat/completions")
requireStringField(t, payload, "auth_type", "apikey")
requireMissingField(t, payload, "user_api_key")
requireStringField(t, payload, "request_id", "ctx-request-id")
requireHeaderField(t, payload, "response_headers", "X-Upstream-Request-Id", []string{"upstream-req-1"})
requireHeaderField(t, payload, "response_headers", "Retry-After", []string{"30"})
requireBoolField(t, payload, "failed", false)
requireFailField(t, payload, http.StatusOK, "")
})
}
func TestUsageQueuePluginAsyncUsesRecordResponseHeaders(t *testing.T) {
withEnabledQueue(t, func() {
ctx := internallogging.WithRequestID(context.Background(), "ctx-request-id")
ctx = internallogging.WithEndpoint(ctx, "POST /v1/chat/completions")
ctx = internallogging.WithResponseStatusHolder(ctx)
ctx = internallogging.WithResponseHeadersHolder(ctx)
internallogging.SetResponseStatus(ctx, http.StatusOK)
initialHeaders := http.Header{}
initialHeaders.Set("X-Upstream-Request-Id", "upstream-req-1")
internallogging.SetResponseHeaders(ctx, initialHeaders)
mgr := coreusage.NewManager(16)
defer mgr.Stop()
mgr.Register(pluginFunc(func(ctx context.Context, _ coreusage.Record) {
nextHeaders := http.Header{}
nextHeaders.Set("X-Upstream-Request-Id", "upstream-req-2")
internallogging.SetResponseHeaders(ctx, nextHeaders)
}))
mgr.Register(&usageQueuePlugin{})
mgr.Publish(ctx, coreusage.Record{
Provider: "openai",
Model: "gpt-5.4",
Alias: "client-gpt",
APIKey: "test-key",
AuthIndex: "0",
AuthType: "apikey",
Source: "user@example.com",
RequestedAt: time.Date(2026, 4, 25, 0, 0, 0, 0, time.UTC),
Latency: 1500 * time.Millisecond,
Detail: coreusage.Detail{
InputTokens: 10,
OutputTokens: 20,
TotalTokens: 30,
},
ResponseHeaders: internallogging.GetResponseHeaders(ctx),
})
payload := waitForSinglePayload(t, 2*time.Second)
requireHeaderField(t, payload, "response_headers", "X-Upstream-Request-Id", []string{"upstream-req-1"})
})
}
func TestUsageQueuePluginPayloadIncludesStableFieldsAndFailureAndGinRequestID(t *testing.T) {
withEnabledQueue(t, func() {
ctx := internallogging.WithRequestID(context.Background(), "gin-request-id")
ctx = internallogging.WithEndpoint(ctx, "GET /v1/responses")
ctx = internallogging.WithResponseStatusHolder(ctx)
internallogging.SetResponseStatus(ctx, http.StatusInternalServerError)
plugin := &usageQueuePlugin{}
plugin.HandleUsage(ctx, coreusage.Record{
Provider: "openai",
Model: "gpt-5.4-mini",
Alias: "client-mini",
APIKey: "test-key",
AuthIndex: "0",
AuthType: "apikey",
Source: "user@example.com",
RequestedAt: time.Date(2026, 4, 25, 0, 0, 0, 0, time.UTC),
Latency: 2500 * time.Millisecond,
Fail: coreusage.Failure{
StatusCode: http.StatusInternalServerError,
Body: "upstream failed",
},
Detail: coreusage.Detail{
InputTokens: 10,
OutputTokens: 20,
TotalTokens: 30,
},
})
payload := popSinglePayload(t)
requireStringField(t, payload, "provider", "openai")
requireStringField(t, payload, "model", "gpt-5.4-mini")
requireStringField(t, payload, "alias", "client-mini")
requireStringField(t, payload, "endpoint", "GET /v1/responses")
requireStringField(t, payload, "auth_type", "apikey")
requireMissingField(t, payload, "user_api_key")
requireStringField(t, payload, "request_id", "gin-request-id")
requireBoolField(t, payload, "failed", true)
requireFailField(t, payload, http.StatusInternalServerError, "upstream failed")
})
}
func TestUsageQueuePluginAsyncIgnoresRecycledGinContext(t *testing.T) {
withEnabledQueue(t, func() {
ginCtx := newTestGinContext(t, http.MethodPost, "/v1/chat/completions", http.StatusOK)
ctx := context.WithValue(context.Background(), "gin", ginCtx)
ctx = internallogging.WithRequestID(ctx, "ctx-request-id")
ctx = internallogging.WithEndpoint(ctx, "POST /v1/chat/completions")
ctx = internallogging.WithResponseStatusHolder(ctx)
internallogging.SetResponseStatus(ctx, http.StatusInternalServerError)
mgr := coreusage.NewManager(16)
defer mgr.Stop()
mgr.Register(pluginFunc(func(_ context.Context, _ coreusage.Record) {
ginCtx.Request = httptest.NewRequest(http.MethodGet, "http://example.com/v1/responses", nil)
ginCtx.Status(http.StatusOK)
}))
mgr.Register(&usageQueuePlugin{})
mgr.Publish(ctx, coreusage.Record{
Provider: "openai",
Model: "gpt-5.4",
Alias: "client-gpt",
APIKey: "test-key",
AuthIndex: "0",
AuthType: "apikey",
Source: "user@example.com",
RequestedAt: time.Date(2026, 4, 25, 0, 0, 0, 0, time.UTC),
Latency: 1500 * time.Millisecond,
Fail: coreusage.Failure{
StatusCode: http.StatusBadGateway,
Body: "bad gateway",
},
Detail: coreusage.Detail{
InputTokens: 10,
OutputTokens: 20,
TotalTokens: 30,
},
})
payload := waitForSinglePayload(t, 2*time.Second)
requireStringField(t, payload, "endpoint", "POST /v1/chat/completions")
requireStringField(t, payload, "alias", "client-gpt")
requireMissingField(t, payload, "user_api_key")
requireStringField(t, payload, "request_id", "ctx-request-id")
requireBoolField(t, payload, "failed", true)
requireFailField(t, payload, http.StatusBadGateway, "bad gateway")
})
}
func withEnabledQueue(t *testing.T, fn func()) {
t.Helper()
prevQueueEnabled := Enabled()
prevUsageEnabled := UsageStatisticsEnabled()
SetEnabled(false)
SetEnabled(true)
SetUsageStatisticsEnabled(true)
defer func() {
SetEnabled(false)
SetEnabled(prevQueueEnabled)
SetUsageStatisticsEnabled(prevUsageEnabled)
}()
fn()
}
func newTestGinContext(t *testing.T, method, path string, status int) *gin.Context {
t.Helper()
gin.SetMode(gin.TestMode)
recorder := httptest.NewRecorder()
ginCtx, _ := gin.CreateTestContext(recorder)
ginCtx.Request = httptest.NewRequest(method, "http://example.com"+path, nil)
if status != 0 {
ginCtx.Status(status)
}
return ginCtx
}
func popSinglePayload(t *testing.T) map[string]json.RawMessage {
t.Helper()
items := PopOldest(10)
if len(items) != 1 {
t.Fatalf("PopOldest() items = %d, want 1", len(items))
}
var payload map[string]json.RawMessage
if err := json.Unmarshal(items[0], &payload); err != nil {
t.Fatalf("unmarshal payload: %v", err)
}
return payload
}
func waitForSinglePayload(t *testing.T, timeout time.Duration) map[string]json.RawMessage {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
items := PopOldest(10)
if len(items) == 0 {
time.Sleep(10 * time.Millisecond)
continue
}
if len(items) != 1 {
t.Fatalf("PopOldest() items = %d, want 1", len(items))
}
var payload map[string]json.RawMessage
if err := json.Unmarshal(items[0], &payload); err != nil {
t.Fatalf("unmarshal payload: %v", err)
}
return payload
}
t.Fatalf("timeout waiting for queued payload")
return nil
}
func requireStringField(t *testing.T, payload map[string]json.RawMessage, key, want string) {
t.Helper()
raw, ok := payload[key]
if !ok {
t.Fatalf("payload missing %q", key)
}
var got string
if err := json.Unmarshal(raw, &got); err != nil {
t.Fatalf("unmarshal %q: %v", key, err)
}
if got != want {
t.Fatalf("%s = %q, want %q", key, got, want)
}
}
func requireMissingField(t *testing.T, payload map[string]json.RawMessage, key string) {
t.Helper()
if _, ok := payload[key]; ok {
t.Fatalf("payload unexpectedly contains %q", key)
}
}
type pluginFunc func(context.Context, coreusage.Record)
func (fn pluginFunc) HandleUsage(ctx context.Context, record coreusage.Record) {
fn(ctx, record)
}
func requireBoolField(t *testing.T, payload map[string]json.RawMessage, key string, want bool) {
t.Helper()
raw, ok := payload[key]
if !ok {
t.Fatalf("payload missing %q", key)
}
var got bool
if err := json.Unmarshal(raw, &got); err != nil {
t.Fatalf("unmarshal %q: %v", key, err)
}
if got != want {
t.Fatalf("%s = %t, want %t", key, got, want)
}
}
func requireFailField(t *testing.T, payload map[string]json.RawMessage, wantStatus int, wantBody string) {
t.Helper()
raw, ok := payload["fail"]
if !ok {
t.Fatalf("payload missing %q", "fail")
}
var got struct {
StatusCode int `json:"status_code"`
Body string `json:"body"`
}
if err := json.Unmarshal(raw, &got); err != nil {
t.Fatalf("unmarshal fail: %v", err)
}
if got.StatusCode != wantStatus || got.Body != wantBody {
t.Fatalf("fail = {status_code:%d body:%q}, want {status_code:%d body:%q}", got.StatusCode, got.Body, wantStatus, wantBody)
}
}
func requireHeaderField(t *testing.T, payload map[string]json.RawMessage, field, key string, want []string) {
t.Helper()
raw, ok := payload[field]
if !ok {
t.Fatalf("payload missing %q", field)
}
var headers map[string][]string
if err := json.Unmarshal(raw, &headers); err != nil {
t.Fatalf("unmarshal %q: %v", field, err)
}
got, ok := headers[key]
if !ok {
t.Fatalf("%s missing header %q", field, key)
}
if len(got) != len(want) {
t.Fatalf("%s[%q] = %v, want %v", field, key, got, want)
}
for i := range want {
if got[i] != want[i] {
t.Fatalf("%s[%q] = %v, want %v", field, key, got, want)
}
}
}