diff --git a/internal/redisqueue/plugin.go b/internal/redisqueue/plugin.go index eb3c8c822..ac48d0c13 100644 --- a/internal/redisqueue/plugin.go +++ b/internal/redisqueue/plugin.go @@ -78,6 +78,7 @@ func (p *usageQueuePlugin) HandleUsage(ctx context.Context, record coreusage.Rec detail := requestDetail{ Timestamp: timestamp, LatencyMs: record.Latency.Milliseconds(), + TTFTMs: record.TTFT.Milliseconds(), Source: record.Source, AuthIndex: record.AuthIndex, Tokens: tokens, @@ -118,6 +119,7 @@ type queuedUsageDetail struct { type requestDetail struct { Timestamp time.Time `json:"timestamp"` LatencyMs int64 `json:"latency_ms"` + TTFTMs int64 `json:"ttft_ms"` Source string `json:"source"` AuthIndex string `json:"auth_index"` Tokens tokenStats `json:"tokens"` diff --git a/internal/runtime/executor/aistudio_executor.go b/internal/runtime/executor/aistudio_executor.go index ad15114a3..0e2718c72 100644 --- a/internal/runtime/executor/aistudio_executor.go +++ b/internal/runtime/executor/aistudio_executor.go @@ -168,13 +168,16 @@ func (e *AIStudioExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, AuthValue: authValue, }) + reporter.StartResponseTTFT() wsResp, err := e.relay.NonStream(ctx, authID, wsReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) return resp, err } helps.RecordAPIResponseMetadata(ctx, e.cfg, wsResp.Status, wsResp.Headers.Clone()) + reporter.StartResponseTTFT() if len(wsResp.Body) > 0 { + reporter.MarkFirstResponseByte() helps.AppendAPIResponseChunk(ctx, e.cfg, wsResp.Body) } if wsResp.Status < 200 || wsResp.Status >= 300 { @@ -231,6 +234,7 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth AuthType: authType, AuthValue: authValue, }) + reporter.StartResponseTTFT() wsStream, err := e.relay.Stream(ctx, authID, wsReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) @@ -246,10 +250,12 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth metadataLogged := false if firstEvent.Status > 0 { helps.RecordAPIResponseMetadata(ctx, e.cfg, firstEvent.Status, firstEvent.Headers.Clone()) + reporter.StartResponseTTFT() metadataLogged = true } var body bytes.Buffer if len(firstEvent.Payload) > 0 { + reporter.MarkFirstResponseByte() helps.AppendAPIResponseChunk(ctx, e.cfg, firstEvent.Payload) body.Write(firstEvent.Payload) } @@ -266,9 +272,11 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth } if !metadataLogged && event.Status > 0 { helps.RecordAPIResponseMetadata(ctx, e.cfg, event.Status, event.Headers.Clone()) + reporter.StartResponseTTFT() metadataLogged = true } if len(event.Payload) > 0 { + reporter.MarkFirstResponseByte() helps.AppendAPIResponseChunk(ctx, e.cfg, event.Payload) body.Write(event.Payload) } @@ -297,10 +305,12 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth case wsrelay.MessageTypeStreamStart: if !metadataLogged && event.Status > 0 { helps.RecordAPIResponseMetadata(ctx, e.cfg, event.Status, event.Headers.Clone()) + reporter.StartResponseTTFT() metadataLogged = true } case wsrelay.MessageTypeStreamChunk: if len(event.Payload) > 0 { + reporter.MarkFirstResponseByte() helps.AppendAPIResponseChunk(ctx, e.cfg, event.Payload) filtered := helps.FilterSSEUsageMetadata(event.Payload) if detail, ok := helps.ParseGeminiStreamUsage(filtered); ok { @@ -321,9 +331,11 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth case wsrelay.MessageTypeHTTPResp: if !metadataLogged && event.Status > 0 { helps.RecordAPIResponseMetadata(ctx, e.cfg, event.Status, event.Headers.Clone()) + reporter.StartResponseTTFT() metadataLogged = true } if len(event.Payload) > 0 { + reporter.MarkFirstResponseByte() helps.AppendAPIResponseChunk(ctx, e.cfg, event.Payload) } lines := sdktranslator.TranslateStream(ctx, body.toFormat, opts.SourceFormat, req.Model, opts.OriginalRequest, translatedReq, event.Payload, ¶m) diff --git a/internal/runtime/executor/aistudio_executor_test.go b/internal/runtime/executor/aistudio_executor_test.go new file mode 100644 index 000000000..52ce6147a --- /dev/null +++ b/internal/runtime/executor/aistudio_executor_test.go @@ -0,0 +1,138 @@ +package executor + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/router-for-me/CLIProxyAPI/v7/internal/config" + "github.com/router-for-me/CLIProxyAPI/v7/internal/wsrelay" + cliproxyauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/executor" + "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/usage" + sdktranslator "github.com/router-for-me/CLIProxyAPI/v7/sdk/translator" +) + +func TestAIStudioExecutorExecuteStartsTTFTBeforeRelayWait(t *testing.T) { + const authID = "aistudio-ttft-auth" + delay := 40 * time.Millisecond + connected := make(chan struct{}) + var connectedOnce sync.Once + relay := wsrelay.NewManager(wsrelay.Options{ + ProviderFactory: func(*http.Request) (string, error) { + return authID, nil + }, + OnConnected: func(provider string) { + if provider == authID { + connectedOnce.Do(func() { + close(connected) + }) + } + }, + }) + server := httptest.NewServer(relay.Handler()) + defer server.Close() + defer func() { + if errStop := relay.Stop(context.Background()); errStop != nil { + t.Errorf("relay stop error = %v", errStop) + } + }() + + wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + relay.Path() + conn, _, errDial := websocket.DefaultDialer.Dial(wsURL, nil) + if errDial != nil { + t.Fatalf("dial websocket: %v", errDial) + } + defer func() { + if errClose := conn.Close(); errClose != nil { + t.Errorf("websocket close error = %v", errClose) + } + }() + select { + case <-connected: + case <-time.After(time.Second): + t.Fatal("timed out waiting for relay connection") + } + + clientDone := make(chan error, 1) + go func() { + var msg wsrelay.Message + if errReadJSON := conn.ReadJSON(&msg); errReadJSON != nil { + clientDone <- fmt.Errorf("read relay request: %w", errReadJSON) + return + } + if msg.Type != wsrelay.MessageTypeHTTPReq { + clientDone <- fmt.Errorf("relay message type = %q, want %q", msg.Type, wsrelay.MessageTypeHTTPReq) + return + } + time.Sleep(delay) + response := wsrelay.Message{ + ID: msg.ID, + Type: wsrelay.MessageTypeHTTPResp, + Payload: map[string]any{ + "status": float64(http.StatusOK), + "headers": map[string]any{"Content-Type": "application/json"}, + "body": `{"candidates":[{"content":{"role":"model","parts":[{"text":"ok"}]},"finishReason":"STOP"}],"usageMetadata":{"promptTokenCount":1,"candidatesTokenCount":1,"totalTokenCount":2}}`, + }, + } + if errWriteJSON := conn.WriteJSON(response); errWriteJSON != nil { + clientDone <- fmt.Errorf("write relay response: %w", errWriteJSON) + return + } + clientDone <- nil + }() + + plugin := &captureAIStudioUsagePlugin{records: make(chan usage.Record, 16)} + usage.RegisterPlugin(plugin) + exec := NewAIStudioExecutor(&config.Config{}, "aistudio", relay) + _, errExecute := exec.Execute(context.Background(), &cliproxyauth.Auth{ID: authID, Provider: "aistudio"}, cliproxyexecutor.Request{ + Model: "gemini-3.1-pro-preview", + Payload: []byte(`{"contents":[{"role":"user","parts":[{"text":"hi"}]}]}`), + }, cliproxyexecutor.Options{SourceFormat: sdktranslator.FormatGemini}) + if errExecute != nil { + t.Fatalf("Execute() error = %v", errExecute) + } + if errClient := <-clientDone; errClient != nil { + t.Fatal(errClient) + } + + record := waitForAIStudioUsageRecord(t, plugin.records, "gemini-3.1-pro-preview") + if record.TTFT < delay { + t.Fatalf("ttft = %v, want >= %v", record.TTFT, delay) + } +} + +type captureAIStudioUsagePlugin struct { + records chan usage.Record +} + +func (p *captureAIStudioUsagePlugin) HandleUsage(_ context.Context, record usage.Record) { + if p == nil { + return + } + select { + case p.records <- record: + default: + } +} + +func waitForAIStudioUsageRecord(t *testing.T, records <-chan usage.Record, model string) usage.Record { + t.Helper() + timeout := time.After(2 * time.Second) + for { + select { + case record := <-records: + if record.Provider == "aistudio" && record.Model == model { + return record + } + case <-timeout: + t.Fatalf("timed out waiting for AI Studio usage record") + } + } +} diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index 77f840cb1..408a490d0 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -529,6 +529,7 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au baseURLs := antigravityBaseURLFallbackOrder(auth) httpClient := newAntigravityHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) attempts := antigravityRetryAttempts(auth, e.cfg) attemptLoop: @@ -728,6 +729,7 @@ func (e *AntigravityExecutor) executeClaudeNonStream(ctx context.Context, auth * baseURLs := antigravityBaseURLFallbackOrder(auth) httpClient := newAntigravityHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) attempts := antigravityRetryAttempts(auth, e.cfg) @@ -1190,6 +1192,7 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya baseURLs := antigravityBaseURLFallbackOrder(auth) httpClient := newAntigravityHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) attempts := antigravityRetryAttempts(auth, e.cfg) diff --git a/internal/runtime/executor/claude_executor.go b/internal/runtime/executor/claude_executor.go index 8d8ea4dbf..626a90abe 100644 --- a/internal/runtime/executor/claude_executor.go +++ b/internal/runtime/executor/claude_executor.go @@ -227,6 +227,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r }) httpClient := helps.NewUtlsHTTPClient(e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) @@ -402,6 +403,7 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A }) httpClient := helps.NewUtlsHTTPClient(e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index 317bc4d25..a5899efbb 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -311,6 +311,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re AuthValue: authValue, }) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) @@ -468,6 +469,7 @@ func (e *CodexExecutor) executeCompact(ctx context.Context, auth *cliproxyauth.A AuthValue: authValue, }) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) @@ -571,6 +573,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au }) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) diff --git a/internal/runtime/executor/codex_openai_images.go b/internal/runtime/executor/codex_openai_images.go index 211f89357..415cdf1c7 100644 --- a/internal/runtime/executor/codex_openai_images.go +++ b/internal/runtime/executor/codex_openai_images.go @@ -107,6 +107,7 @@ func (e *CodexExecutor) executeOpenAIImage(ctx context.Context, auth *cliproxyau recordCodexOpenAIImageRequest(ctx, e.cfg, e.Identifier(), auth, url, httpReq.Header.Clone(), body) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { helps.RecordAPIResponseError(ctx, e.cfg, errDo) @@ -196,6 +197,7 @@ func (e *CodexExecutor) executeOpenAIImageStream(ctx context.Context, auth *clip recordCodexOpenAIImageRequest(ctx, e.cfg, e.Identifier(), auth, url, httpReq.Header.Clone(), body) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { helps.RecordAPIResponseError(ctx, e.cfg, errDo) diff --git a/internal/runtime/executor/codex_websockets_executor.go b/internal/runtime/executor/codex_websockets_executor.go index e3ce9ce0c..5594356bb 100644 --- a/internal/runtime/executor/codex_websockets_executor.go +++ b/internal/runtime/executor/codex_websockets_executor.go @@ -269,6 +269,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut return resp, errDial } recordAPIWebsocketHandshake(ctx, e.cfg, respHS) + reporter.StartResponseTTFT() if sess == nil { logCodexWebsocketConnected(executionSessionID, authID, wsURL) defer func() { @@ -312,6 +313,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut AuthValue: authValue, }) recordAPIWebsocketHandshake(ctx, e.cfg, respHSRetry) + reporter.StartResponseTTFT() if errSendRetry := writeCodexWebsocketMessage(sess, connRetry, wsReqBodyRetry); errSendRetry == nil { conn = connRetry wsReqBody = wsReqBodyRetry @@ -356,6 +358,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut if len(payload) == 0 { continue } + reporter.MarkFirstResponseByte() helps.AppendAPIWebsocketResponse(ctx, e.cfg, payload) if wsErr, ok := parseCodexWebsocketError(payload); ok { @@ -476,6 +479,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr return nil, errDial } recordAPIWebsocketHandshake(ctx, e.cfg, respHS) + reporter.StartResponseTTFT() if sess == nil { logCodexWebsocketConnected(executionSessionID, authID, wsURL) @@ -514,6 +518,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr AuthValue: authValue, }) recordAPIWebsocketHandshake(ctx, e.cfg, respHSRetry) + reporter.StartResponseTTFT() if errSendRetry := writeCodexWebsocketMessage(sess, connRetry, wsReqBodyRetry); errSendRetry != nil { helps.RecordAPIWebsocketError(ctx, e.cfg, "send_retry", errSendRetry) e.invalidateUpstreamConn(sess, connRetry, "send_error", errSendRetry) @@ -606,6 +611,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr if len(payload) == 0 { continue } + reporter.MarkFirstResponseByte() helps.AppendAPIWebsocketResponse(ctx, e.cfg, payload) if wsErr, ok := parseCodexWebsocketError(payload); ok { diff --git a/internal/runtime/executor/gemini_cli_executor.go b/internal/runtime/executor/gemini_cli_executor.go index da4440400..d6b97021b 100644 --- a/internal/runtime/executor/gemini_cli_executor.go +++ b/internal/runtime/executor/gemini_cli_executor.go @@ -158,6 +158,7 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth } httpClient := newHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) respCtx := context.WithValue(ctx, "alt", opts.Alt) var authID, authLabel, authType, authValue string @@ -310,6 +311,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut } httpClient := newHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) respCtx := context.WithValue(ctx, "alt", opts.Alt) var authID, authLabel, authType, authValue string diff --git a/internal/runtime/executor/gemini_executor.go b/internal/runtime/executor/gemini_executor.go index 99c06dbdc..2f4f1935e 100644 --- a/internal/runtime/executor/gemini_executor.go +++ b/internal/runtime/executor/gemini_executor.go @@ -183,6 +183,7 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r }) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) @@ -289,6 +290,7 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A }) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) diff --git a/internal/runtime/executor/gemini_vertex_executor.go b/internal/runtime/executor/gemini_vertex_executor.go index 98e46221b..50c22b9cd 100644 --- a/internal/runtime/executor/gemini_vertex_executor.go +++ b/internal/runtime/executor/gemini_vertex_executor.go @@ -395,6 +395,7 @@ func (e *GeminiVertexExecutor) executeWithServiceAccount(ctx context.Context, au }) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { helps.RecordAPIResponseError(ctx, e.cfg, errDo) @@ -518,6 +519,7 @@ func (e *GeminiVertexExecutor) executeWithAPIKey(ctx context.Context, auth *clip }) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { helps.RecordAPIResponseError(ctx, e.cfg, errDo) @@ -630,6 +632,7 @@ func (e *GeminiVertexExecutor) executeStreamWithServiceAccount(ctx context.Conte }) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { helps.RecordAPIResponseError(ctx, e.cfg, errDo) @@ -773,6 +776,7 @@ func (e *GeminiVertexExecutor) executeStreamWithAPIKey(ctx context.Context, auth }) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { helps.RecordAPIResponseError(ctx, e.cfg, errDo) diff --git a/internal/runtime/executor/helps/usage_helpers.go b/internal/runtime/executor/helps/usage_helpers.go index 82f82a440..1c4f4cdf7 100644 --- a/internal/runtime/executor/helps/usage_helpers.go +++ b/internal/runtime/executor/helps/usage_helpers.go @@ -5,6 +5,8 @@ import ( "context" "errors" "fmt" + "io" + "net/http" "strings" "sync" "time" @@ -29,6 +31,10 @@ type UsageReporter struct { source string reasoning string requestedAt time.Time + ttftMu sync.RWMutex + ttft time.Duration + ttftStart time.Time + ttftSet bool once sync.Once } @@ -74,6 +80,64 @@ func (r *UsageReporter) SetTranslatedReasoningEffort(payload []byte, format stri r.reasoning = thinking.ExtractTranslatedReasoningEffort(payload, format) } +func (r *UsageReporter) TrackHTTPClient(client *http.Client) *http.Client { + if r == nil || client == nil { + return client + } + tracked := *client + transport := tracked.Transport + if transport == nil { + transport = http.DefaultTransport + } + tracked.Transport = usageTTFTRoundTripper{ + base: transport, + reporter: r, + } + return &tracked +} + +func (r *UsageReporter) ObserveResponse(resp *http.Response) { + if r == nil || resp == nil || resp.Body == nil { + return + } + r.StartResponseTTFT() + resp.Body = &usageTTFTReadCloser{ + ReadCloser: resp.Body, + mark: func() { + r.MarkFirstResponseByte() + }, + } +} + +func (r *UsageReporter) StartResponseTTFT() { + if r == nil { + return + } + r.ttftMu.Lock() + if !r.ttftSet && r.ttftStart.IsZero() { + r.ttftStart = time.Now() + } + r.ttftMu.Unlock() +} + +func (r *UsageReporter) MarkFirstResponseByte() { + if r == nil { + return + } + r.ttftMu.Lock() + if r.ttftSet { + r.ttftMu.Unlock() + return + } + start := r.ttftStart + r.ttftStart = time.Time{} + r.ttftMu.Unlock() + if start.IsZero() { + return + } + r.setTTFT(time.Since(start)) +} + func (r *UsageReporter) buildAdditionalModelRecord(model string, detail usage.Detail) (usage.Record, bool) { if r == nil { return usage.Record{}, false @@ -177,6 +241,7 @@ func (r *UsageReporter) buildRecordForModel(model string, detail usage.Detail, f ReasoningEffort: r.reasoning, RequestedAt: r.requestedAt, Latency: r.latency(), + TTFT: r.ttftDuration(), Failed: failed, Fail: fail, Detail: detail, @@ -211,6 +276,65 @@ func (r *UsageReporter) latency() time.Duration { return latency } +func (r *UsageReporter) setTTFT(ttft time.Duration) { + if r == nil { + return + } + if ttft < 0 { + ttft = 0 + } + r.ttftMu.Lock() + if r.ttftSet { + r.ttftMu.Unlock() + return + } + r.ttft = ttft + r.ttftSet = true + r.ttftStart = time.Time{} + r.ttftMu.Unlock() +} + +func (r *UsageReporter) ttftDuration() time.Duration { + if r == nil { + return 0 + } + r.ttftMu.RLock() + defer r.ttftMu.RUnlock() + return r.ttft +} + +type usageTTFTRoundTripper struct { + base http.RoundTripper + reporter *UsageReporter +} + +func (t usageTTFTRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + t.reporter.StartResponseTTFT() + resp, errRoundTrip := t.base.RoundTrip(req) + if errRoundTrip != nil { + return resp, errRoundTrip + } + t.reporter.ObserveResponse(resp) + return resp, nil +} + +type usageTTFTReadCloser struct { + io.ReadCloser + once sync.Once + mark func() +} + +func (r *usageTTFTReadCloser) Read(p []byte) (int, error) { + if r == nil || r.ReadCloser == nil { + return 0, io.ErrClosedPipe + } + n, errRead := r.ReadCloser.Read(p) + if n > 0 && r.mark != nil { + r.once.Do(r.mark) + } + return n, errRead +} + func APIKeyFromContext(ctx context.Context) string { if ctx == nil { return "" diff --git a/internal/runtime/executor/helps/usage_helpers_test.go b/internal/runtime/executor/helps/usage_helpers_test.go index 330641c61..58b175f3b 100644 --- a/internal/runtime/executor/helps/usage_helpers_test.go +++ b/internal/runtime/executor/helps/usage_helpers_test.go @@ -2,6 +2,9 @@ package helps import ( "context" + "io" + "net/http" + "strings" "testing" "time" @@ -146,6 +149,41 @@ func TestUsageReporterBuildRecordIncludesLatency(t *testing.T) { } } +func TestUsageReporterTrackHTTPClientStartsTTFTBeforeRoundTrip(t *testing.T) { + delay := 40 * time.Millisecond + reporter := NewUsageReporter(context.Background(), "openai", "gpt-5.4", nil) + client := reporter.TrackHTTPClient(&http.Client{ + Transport: roundTripFunc(func(req *http.Request) (*http.Response, error) { + time.Sleep(delay) + return &http.Response{ + StatusCode: http.StatusOK, + Status: "200 OK", + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader("ok")), + Request: req, + }, nil + }), + }) + + req, errNewRequest := http.NewRequestWithContext(context.Background(), http.MethodPost, "https://example.invalid/v1/chat/completions", strings.NewReader("{}")) + if errNewRequest != nil { + t.Fatalf("NewRequestWithContext() error = %v", errNewRequest) + } + resp, errDo := client.Do(req) + if errDo != nil { + t.Fatalf("Do() error = %v", errDo) + } + if _, errRead := io.ReadAll(resp.Body); errRead != nil { + t.Fatalf("ReadAll() error = %v", errRead) + } + if errClose := resp.Body.Close(); errClose != nil { + t.Fatalf("response body close error = %v", errClose) + } + if got := reporter.ttftDuration(); got < delay { + t.Fatalf("ttft = %v, want >= %v", got, delay) + } +} + func TestUsageReporterBuildRecordIncludesRequestedModelAlias(t *testing.T) { ctx := usage.WithRequestedModelAlias(context.Background(), "client-gpt") reporter := NewUsageReporter(ctx, "openai", "gpt-5.4", nil) @@ -186,3 +224,9 @@ func TestUsageReporterBuildAdditionalModelRecordSkipsZeroTokens(t *testing.T) { t.Fatalf("expected non-zero cached token usage to be recorded") } } + +type roundTripFunc func(*http.Request) (*http.Response, error) + +func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req) +} diff --git a/internal/runtime/executor/kimi_executor.go b/internal/runtime/executor/kimi_executor.go index 154215823..d7ab643ad 100644 --- a/internal/runtime/executor/kimi_executor.go +++ b/internal/runtime/executor/kimi_executor.go @@ -146,6 +146,7 @@ func (e *KimiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req }) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) @@ -257,6 +258,7 @@ func (e *KimiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut }) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) diff --git a/internal/runtime/executor/openai_compat_executor.go b/internal/runtime/executor/openai_compat_executor.go index 24aa661dd..8475e372a 100644 --- a/internal/runtime/executor/openai_compat_executor.go +++ b/internal/runtime/executor/openai_compat_executor.go @@ -162,6 +162,7 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A }) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) @@ -252,6 +253,7 @@ func (e *OpenAICompatExecutor) executeImages(ctx context.Context, auth *cliproxy }) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) @@ -360,6 +362,7 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy }) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) @@ -510,6 +513,7 @@ func (e *OpenAICompatExecutor) executeImagesStream(ctx context.Context, auth *cl }) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) diff --git a/internal/runtime/executor/xai_executor.go b/internal/runtime/executor/xai_executor.go index aabd5772d..cb42f9393 100644 --- a/internal/runtime/executor/xai_executor.go +++ b/internal/runtime/executor/xai_executor.go @@ -127,6 +127,7 @@ func (e *XAIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req e.recordXAIRequest(ctx, auth, url, httpReq.Header.Clone(), prepared.body) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) @@ -314,6 +315,7 @@ func (e *XAIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth e.recordXAIRequest(ctx, auth, url, httpReq.Header.Clone(), prepared.body) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) diff --git a/sdk/cliproxy/usage/manager.go b/sdk/cliproxy/usage/manager.go index 731fd8d04..6113ca1eb 100644 --- a/sdk/cliproxy/usage/manager.go +++ b/sdk/cliproxy/usage/manager.go @@ -24,6 +24,7 @@ type Record struct { ReasoningEffort string RequestedAt time.Time Latency time.Duration + TTFT time.Duration Failed bool Fail Failure Detail Detail