mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-06-09 15:53:18 +08:00
feat(executor): add TTFT tracking and reporting for enhanced performance metrics
- Introduced Time-To-First-Token (TTFT) measurement and reporting across major executors. - Added TTFT calculation to `UsageReporter`, including support for HTTP clients and WebSocket communication. - Updated tests to validate TTFT tracking in streamed and non-streamed scenarios. - Ensured integration with `usage` plugin and augmented usage records with TTFT data.
This commit is contained in:
@@ -78,6 +78,7 @@ func (p *usageQueuePlugin) HandleUsage(ctx context.Context, record coreusage.Rec
|
||||
detail := requestDetail{
|
||||
Timestamp: timestamp,
|
||||
LatencyMs: record.Latency.Milliseconds(),
|
||||
TTFTMs: record.TTFT.Milliseconds(),
|
||||
Source: record.Source,
|
||||
AuthIndex: record.AuthIndex,
|
||||
Tokens: tokens,
|
||||
@@ -118,6 +119,7 @@ type queuedUsageDetail struct {
|
||||
type requestDetail struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
LatencyMs int64 `json:"latency_ms"`
|
||||
TTFTMs int64 `json:"ttft_ms"`
|
||||
Source string `json:"source"`
|
||||
AuthIndex string `json:"auth_index"`
|
||||
Tokens tokenStats `json:"tokens"`
|
||||
|
||||
@@ -168,13 +168,16 @@ func (e *AIStudioExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth,
|
||||
AuthValue: authValue,
|
||||
})
|
||||
|
||||
reporter.StartResponseTTFT()
|
||||
wsResp, err := e.relay.NonStream(ctx, authID, wsReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
return resp, err
|
||||
}
|
||||
helps.RecordAPIResponseMetadata(ctx, e.cfg, wsResp.Status, wsResp.Headers.Clone())
|
||||
reporter.StartResponseTTFT()
|
||||
if len(wsResp.Body) > 0 {
|
||||
reporter.MarkFirstResponseByte()
|
||||
helps.AppendAPIResponseChunk(ctx, e.cfg, wsResp.Body)
|
||||
}
|
||||
if wsResp.Status < 200 || wsResp.Status >= 300 {
|
||||
@@ -231,6 +234,7 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth
|
||||
AuthType: authType,
|
||||
AuthValue: authValue,
|
||||
})
|
||||
reporter.StartResponseTTFT()
|
||||
wsStream, err := e.relay.Stream(ctx, authID, wsReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
@@ -246,10 +250,12 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth
|
||||
metadataLogged := false
|
||||
if firstEvent.Status > 0 {
|
||||
helps.RecordAPIResponseMetadata(ctx, e.cfg, firstEvent.Status, firstEvent.Headers.Clone())
|
||||
reporter.StartResponseTTFT()
|
||||
metadataLogged = true
|
||||
}
|
||||
var body bytes.Buffer
|
||||
if len(firstEvent.Payload) > 0 {
|
||||
reporter.MarkFirstResponseByte()
|
||||
helps.AppendAPIResponseChunk(ctx, e.cfg, firstEvent.Payload)
|
||||
body.Write(firstEvent.Payload)
|
||||
}
|
||||
@@ -266,9 +272,11 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth
|
||||
}
|
||||
if !metadataLogged && event.Status > 0 {
|
||||
helps.RecordAPIResponseMetadata(ctx, e.cfg, event.Status, event.Headers.Clone())
|
||||
reporter.StartResponseTTFT()
|
||||
metadataLogged = true
|
||||
}
|
||||
if len(event.Payload) > 0 {
|
||||
reporter.MarkFirstResponseByte()
|
||||
helps.AppendAPIResponseChunk(ctx, e.cfg, event.Payload)
|
||||
body.Write(event.Payload)
|
||||
}
|
||||
@@ -297,10 +305,12 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth
|
||||
case wsrelay.MessageTypeStreamStart:
|
||||
if !metadataLogged && event.Status > 0 {
|
||||
helps.RecordAPIResponseMetadata(ctx, e.cfg, event.Status, event.Headers.Clone())
|
||||
reporter.StartResponseTTFT()
|
||||
metadataLogged = true
|
||||
}
|
||||
case wsrelay.MessageTypeStreamChunk:
|
||||
if len(event.Payload) > 0 {
|
||||
reporter.MarkFirstResponseByte()
|
||||
helps.AppendAPIResponseChunk(ctx, e.cfg, event.Payload)
|
||||
filtered := helps.FilterSSEUsageMetadata(event.Payload)
|
||||
if detail, ok := helps.ParseGeminiStreamUsage(filtered); ok {
|
||||
@@ -321,9 +331,11 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth
|
||||
case wsrelay.MessageTypeHTTPResp:
|
||||
if !metadataLogged && event.Status > 0 {
|
||||
helps.RecordAPIResponseMetadata(ctx, e.cfg, event.Status, event.Headers.Clone())
|
||||
reporter.StartResponseTTFT()
|
||||
metadataLogged = true
|
||||
}
|
||||
if len(event.Payload) > 0 {
|
||||
reporter.MarkFirstResponseByte()
|
||||
helps.AppendAPIResponseChunk(ctx, e.cfg, event.Payload)
|
||||
}
|
||||
lines := sdktranslator.TranslateStream(ctx, body.toFormat, opts.SourceFormat, req.Model, opts.OriginalRequest, translatedReq, event.Payload, ¶m)
|
||||
|
||||
138
internal/runtime/executor/aistudio_executor_test.go
Normal file
138
internal/runtime/executor/aistudio_executor_test.go
Normal file
@@ -0,0 +1,138 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/router-for-me/CLIProxyAPI/v7/internal/config"
|
||||
"github.com/router-for-me/CLIProxyAPI/v7/internal/wsrelay"
|
||||
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth"
|
||||
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/executor"
|
||||
"github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/usage"
|
||||
sdktranslator "github.com/router-for-me/CLIProxyAPI/v7/sdk/translator"
|
||||
)
|
||||
|
||||
func TestAIStudioExecutorExecuteStartsTTFTBeforeRelayWait(t *testing.T) {
|
||||
const authID = "aistudio-ttft-auth"
|
||||
delay := 40 * time.Millisecond
|
||||
connected := make(chan struct{})
|
||||
var connectedOnce sync.Once
|
||||
relay := wsrelay.NewManager(wsrelay.Options{
|
||||
ProviderFactory: func(*http.Request) (string, error) {
|
||||
return authID, nil
|
||||
},
|
||||
OnConnected: func(provider string) {
|
||||
if provider == authID {
|
||||
connectedOnce.Do(func() {
|
||||
close(connected)
|
||||
})
|
||||
}
|
||||
},
|
||||
})
|
||||
server := httptest.NewServer(relay.Handler())
|
||||
defer server.Close()
|
||||
defer func() {
|
||||
if errStop := relay.Stop(context.Background()); errStop != nil {
|
||||
t.Errorf("relay stop error = %v", errStop)
|
||||
}
|
||||
}()
|
||||
|
||||
wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + relay.Path()
|
||||
conn, _, errDial := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if errDial != nil {
|
||||
t.Fatalf("dial websocket: %v", errDial)
|
||||
}
|
||||
defer func() {
|
||||
if errClose := conn.Close(); errClose != nil {
|
||||
t.Errorf("websocket close error = %v", errClose)
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-connected:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for relay connection")
|
||||
}
|
||||
|
||||
clientDone := make(chan error, 1)
|
||||
go func() {
|
||||
var msg wsrelay.Message
|
||||
if errReadJSON := conn.ReadJSON(&msg); errReadJSON != nil {
|
||||
clientDone <- fmt.Errorf("read relay request: %w", errReadJSON)
|
||||
return
|
||||
}
|
||||
if msg.Type != wsrelay.MessageTypeHTTPReq {
|
||||
clientDone <- fmt.Errorf("relay message type = %q, want %q", msg.Type, wsrelay.MessageTypeHTTPReq)
|
||||
return
|
||||
}
|
||||
time.Sleep(delay)
|
||||
response := wsrelay.Message{
|
||||
ID: msg.ID,
|
||||
Type: wsrelay.MessageTypeHTTPResp,
|
||||
Payload: map[string]any{
|
||||
"status": float64(http.StatusOK),
|
||||
"headers": map[string]any{"Content-Type": "application/json"},
|
||||
"body": `{"candidates":[{"content":{"role":"model","parts":[{"text":"ok"}]},"finishReason":"STOP"}],"usageMetadata":{"promptTokenCount":1,"candidatesTokenCount":1,"totalTokenCount":2}}`,
|
||||
},
|
||||
}
|
||||
if errWriteJSON := conn.WriteJSON(response); errWriteJSON != nil {
|
||||
clientDone <- fmt.Errorf("write relay response: %w", errWriteJSON)
|
||||
return
|
||||
}
|
||||
clientDone <- nil
|
||||
}()
|
||||
|
||||
plugin := &captureAIStudioUsagePlugin{records: make(chan usage.Record, 16)}
|
||||
usage.RegisterPlugin(plugin)
|
||||
exec := NewAIStudioExecutor(&config.Config{}, "aistudio", relay)
|
||||
_, errExecute := exec.Execute(context.Background(), &cliproxyauth.Auth{ID: authID, Provider: "aistudio"}, cliproxyexecutor.Request{
|
||||
Model: "gemini-3.1-pro-preview",
|
||||
Payload: []byte(`{"contents":[{"role":"user","parts":[{"text":"hi"}]}]}`),
|
||||
}, cliproxyexecutor.Options{SourceFormat: sdktranslator.FormatGemini})
|
||||
if errExecute != nil {
|
||||
t.Fatalf("Execute() error = %v", errExecute)
|
||||
}
|
||||
if errClient := <-clientDone; errClient != nil {
|
||||
t.Fatal(errClient)
|
||||
}
|
||||
|
||||
record := waitForAIStudioUsageRecord(t, plugin.records, "gemini-3.1-pro-preview")
|
||||
if record.TTFT < delay {
|
||||
t.Fatalf("ttft = %v, want >= %v", record.TTFT, delay)
|
||||
}
|
||||
}
|
||||
|
||||
type captureAIStudioUsagePlugin struct {
|
||||
records chan usage.Record
|
||||
}
|
||||
|
||||
func (p *captureAIStudioUsagePlugin) HandleUsage(_ context.Context, record usage.Record) {
|
||||
if p == nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case p.records <- record:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func waitForAIStudioUsageRecord(t *testing.T, records <-chan usage.Record, model string) usage.Record {
|
||||
t.Helper()
|
||||
timeout := time.After(2 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case record := <-records:
|
||||
if record.Provider == "aistudio" && record.Model == model {
|
||||
return record
|
||||
}
|
||||
case <-timeout:
|
||||
t.Fatalf("timed out waiting for AI Studio usage record")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -529,6 +529,7 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au
|
||||
|
||||
baseURLs := antigravityBaseURLFallbackOrder(auth)
|
||||
httpClient := newAntigravityHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
attempts := antigravityRetryAttempts(auth, e.cfg)
|
||||
|
||||
attemptLoop:
|
||||
@@ -728,6 +729,7 @@ func (e *AntigravityExecutor) executeClaudeNonStream(ctx context.Context, auth *
|
||||
|
||||
baseURLs := antigravityBaseURLFallbackOrder(auth)
|
||||
httpClient := newAntigravityHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
|
||||
attempts := antigravityRetryAttempts(auth, e.cfg)
|
||||
|
||||
@@ -1190,6 +1192,7 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya
|
||||
|
||||
baseURLs := antigravityBaseURLFallbackOrder(auth)
|
||||
httpClient := newAntigravityHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
|
||||
attempts := antigravityRetryAttempts(auth, e.cfg)
|
||||
|
||||
|
||||
@@ -227,6 +227,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
|
||||
})
|
||||
|
||||
httpClient := helps.NewUtlsHTTPClient(e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
@@ -402,6 +403,7 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
||||
})
|
||||
|
||||
httpClient := helps.NewUtlsHTTPClient(e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
|
||||
@@ -311,6 +311,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
|
||||
AuthValue: authValue,
|
||||
})
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
@@ -468,6 +469,7 @@ func (e *CodexExecutor) executeCompact(ctx context.Context, auth *cliproxyauth.A
|
||||
AuthValue: authValue,
|
||||
})
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
@@ -571,6 +573,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
|
||||
})
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
|
||||
@@ -107,6 +107,7 @@ func (e *CodexExecutor) executeOpenAIImage(ctx context.Context, auth *cliproxyau
|
||||
recordCodexOpenAIImageRequest(ctx, e.cfg, e.Identifier(), auth, url, httpReq.Header.Clone(), body)
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, errDo := httpClient.Do(httpReq)
|
||||
if errDo != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, errDo)
|
||||
@@ -196,6 +197,7 @@ func (e *CodexExecutor) executeOpenAIImageStream(ctx context.Context, auth *clip
|
||||
recordCodexOpenAIImageRequest(ctx, e.cfg, e.Identifier(), auth, url, httpReq.Header.Clone(), body)
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, errDo := httpClient.Do(httpReq)
|
||||
if errDo != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, errDo)
|
||||
|
||||
@@ -269,6 +269,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
|
||||
return resp, errDial
|
||||
}
|
||||
recordAPIWebsocketHandshake(ctx, e.cfg, respHS)
|
||||
reporter.StartResponseTTFT()
|
||||
if sess == nil {
|
||||
logCodexWebsocketConnected(executionSessionID, authID, wsURL)
|
||||
defer func() {
|
||||
@@ -312,6 +313,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
|
||||
AuthValue: authValue,
|
||||
})
|
||||
recordAPIWebsocketHandshake(ctx, e.cfg, respHSRetry)
|
||||
reporter.StartResponseTTFT()
|
||||
if errSendRetry := writeCodexWebsocketMessage(sess, connRetry, wsReqBodyRetry); errSendRetry == nil {
|
||||
conn = connRetry
|
||||
wsReqBody = wsReqBodyRetry
|
||||
@@ -356,6 +358,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
|
||||
if len(payload) == 0 {
|
||||
continue
|
||||
}
|
||||
reporter.MarkFirstResponseByte()
|
||||
helps.AppendAPIWebsocketResponse(ctx, e.cfg, payload)
|
||||
|
||||
if wsErr, ok := parseCodexWebsocketError(payload); ok {
|
||||
@@ -476,6 +479,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
|
||||
return nil, errDial
|
||||
}
|
||||
recordAPIWebsocketHandshake(ctx, e.cfg, respHS)
|
||||
reporter.StartResponseTTFT()
|
||||
|
||||
if sess == nil {
|
||||
logCodexWebsocketConnected(executionSessionID, authID, wsURL)
|
||||
@@ -514,6 +518,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
|
||||
AuthValue: authValue,
|
||||
})
|
||||
recordAPIWebsocketHandshake(ctx, e.cfg, respHSRetry)
|
||||
reporter.StartResponseTTFT()
|
||||
if errSendRetry := writeCodexWebsocketMessage(sess, connRetry, wsReqBodyRetry); errSendRetry != nil {
|
||||
helps.RecordAPIWebsocketError(ctx, e.cfg, "send_retry", errSendRetry)
|
||||
e.invalidateUpstreamConn(sess, connRetry, "send_error", errSendRetry)
|
||||
@@ -606,6 +611,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
|
||||
if len(payload) == 0 {
|
||||
continue
|
||||
}
|
||||
reporter.MarkFirstResponseByte()
|
||||
helps.AppendAPIWebsocketResponse(ctx, e.cfg, payload)
|
||||
|
||||
if wsErr, ok := parseCodexWebsocketError(payload); ok {
|
||||
|
||||
@@ -158,6 +158,7 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
|
||||
}
|
||||
|
||||
httpClient := newHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
respCtx := context.WithValue(ctx, "alt", opts.Alt)
|
||||
|
||||
var authID, authLabel, authType, authValue string
|
||||
@@ -310,6 +311,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
|
||||
}
|
||||
|
||||
httpClient := newHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
respCtx := context.WithValue(ctx, "alt", opts.Alt)
|
||||
|
||||
var authID, authLabel, authType, authValue string
|
||||
|
||||
@@ -183,6 +183,7 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
|
||||
})
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
@@ -289,6 +290,7 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
||||
})
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
|
||||
@@ -395,6 +395,7 @@ func (e *GeminiVertexExecutor) executeWithServiceAccount(ctx context.Context, au
|
||||
})
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, errDo := httpClient.Do(httpReq)
|
||||
if errDo != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, errDo)
|
||||
@@ -518,6 +519,7 @@ func (e *GeminiVertexExecutor) executeWithAPIKey(ctx context.Context, auth *clip
|
||||
})
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, errDo := httpClient.Do(httpReq)
|
||||
if errDo != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, errDo)
|
||||
@@ -630,6 +632,7 @@ func (e *GeminiVertexExecutor) executeStreamWithServiceAccount(ctx context.Conte
|
||||
})
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, errDo := httpClient.Do(httpReq)
|
||||
if errDo != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, errDo)
|
||||
@@ -773,6 +776,7 @@ func (e *GeminiVertexExecutor) executeStreamWithAPIKey(ctx context.Context, auth
|
||||
})
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, errDo := httpClient.Do(httpReq)
|
||||
if errDo != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, errDo)
|
||||
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -29,6 +31,10 @@ type UsageReporter struct {
|
||||
source string
|
||||
reasoning string
|
||||
requestedAt time.Time
|
||||
ttftMu sync.RWMutex
|
||||
ttft time.Duration
|
||||
ttftStart time.Time
|
||||
ttftSet bool
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
@@ -74,6 +80,64 @@ func (r *UsageReporter) SetTranslatedReasoningEffort(payload []byte, format stri
|
||||
r.reasoning = thinking.ExtractTranslatedReasoningEffort(payload, format)
|
||||
}
|
||||
|
||||
func (r *UsageReporter) TrackHTTPClient(client *http.Client) *http.Client {
|
||||
if r == nil || client == nil {
|
||||
return client
|
||||
}
|
||||
tracked := *client
|
||||
transport := tracked.Transport
|
||||
if transport == nil {
|
||||
transport = http.DefaultTransport
|
||||
}
|
||||
tracked.Transport = usageTTFTRoundTripper{
|
||||
base: transport,
|
||||
reporter: r,
|
||||
}
|
||||
return &tracked
|
||||
}
|
||||
|
||||
func (r *UsageReporter) ObserveResponse(resp *http.Response) {
|
||||
if r == nil || resp == nil || resp.Body == nil {
|
||||
return
|
||||
}
|
||||
r.StartResponseTTFT()
|
||||
resp.Body = &usageTTFTReadCloser{
|
||||
ReadCloser: resp.Body,
|
||||
mark: func() {
|
||||
r.MarkFirstResponseByte()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (r *UsageReporter) StartResponseTTFT() {
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
r.ttftMu.Lock()
|
||||
if !r.ttftSet && r.ttftStart.IsZero() {
|
||||
r.ttftStart = time.Now()
|
||||
}
|
||||
r.ttftMu.Unlock()
|
||||
}
|
||||
|
||||
func (r *UsageReporter) MarkFirstResponseByte() {
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
r.ttftMu.Lock()
|
||||
if r.ttftSet {
|
||||
r.ttftMu.Unlock()
|
||||
return
|
||||
}
|
||||
start := r.ttftStart
|
||||
r.ttftStart = time.Time{}
|
||||
r.ttftMu.Unlock()
|
||||
if start.IsZero() {
|
||||
return
|
||||
}
|
||||
r.setTTFT(time.Since(start))
|
||||
}
|
||||
|
||||
func (r *UsageReporter) buildAdditionalModelRecord(model string, detail usage.Detail) (usage.Record, bool) {
|
||||
if r == nil {
|
||||
return usage.Record{}, false
|
||||
@@ -177,6 +241,7 @@ func (r *UsageReporter) buildRecordForModel(model string, detail usage.Detail, f
|
||||
ReasoningEffort: r.reasoning,
|
||||
RequestedAt: r.requestedAt,
|
||||
Latency: r.latency(),
|
||||
TTFT: r.ttftDuration(),
|
||||
Failed: failed,
|
||||
Fail: fail,
|
||||
Detail: detail,
|
||||
@@ -211,6 +276,65 @@ func (r *UsageReporter) latency() time.Duration {
|
||||
return latency
|
||||
}
|
||||
|
||||
func (r *UsageReporter) setTTFT(ttft time.Duration) {
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
if ttft < 0 {
|
||||
ttft = 0
|
||||
}
|
||||
r.ttftMu.Lock()
|
||||
if r.ttftSet {
|
||||
r.ttftMu.Unlock()
|
||||
return
|
||||
}
|
||||
r.ttft = ttft
|
||||
r.ttftSet = true
|
||||
r.ttftStart = time.Time{}
|
||||
r.ttftMu.Unlock()
|
||||
}
|
||||
|
||||
func (r *UsageReporter) ttftDuration() time.Duration {
|
||||
if r == nil {
|
||||
return 0
|
||||
}
|
||||
r.ttftMu.RLock()
|
||||
defer r.ttftMu.RUnlock()
|
||||
return r.ttft
|
||||
}
|
||||
|
||||
type usageTTFTRoundTripper struct {
|
||||
base http.RoundTripper
|
||||
reporter *UsageReporter
|
||||
}
|
||||
|
||||
func (t usageTTFTRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
t.reporter.StartResponseTTFT()
|
||||
resp, errRoundTrip := t.base.RoundTrip(req)
|
||||
if errRoundTrip != nil {
|
||||
return resp, errRoundTrip
|
||||
}
|
||||
t.reporter.ObserveResponse(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
type usageTTFTReadCloser struct {
|
||||
io.ReadCloser
|
||||
once sync.Once
|
||||
mark func()
|
||||
}
|
||||
|
||||
func (r *usageTTFTReadCloser) Read(p []byte) (int, error) {
|
||||
if r == nil || r.ReadCloser == nil {
|
||||
return 0, io.ErrClosedPipe
|
||||
}
|
||||
n, errRead := r.ReadCloser.Read(p)
|
||||
if n > 0 && r.mark != nil {
|
||||
r.once.Do(r.mark)
|
||||
}
|
||||
return n, errRead
|
||||
}
|
||||
|
||||
func APIKeyFromContext(ctx context.Context) string {
|
||||
if ctx == nil {
|
||||
return ""
|
||||
|
||||
@@ -2,6 +2,9 @@ package helps
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -146,6 +149,41 @@ func TestUsageReporterBuildRecordIncludesLatency(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestUsageReporterTrackHTTPClientStartsTTFTBeforeRoundTrip(t *testing.T) {
|
||||
delay := 40 * time.Millisecond
|
||||
reporter := NewUsageReporter(context.Background(), "openai", "gpt-5.4", nil)
|
||||
client := reporter.TrackHTTPClient(&http.Client{
|
||||
Transport: roundTripFunc(func(req *http.Request) (*http.Response, error) {
|
||||
time.Sleep(delay)
|
||||
return &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Status: "200 OK",
|
||||
Header: make(http.Header),
|
||||
Body: io.NopCloser(strings.NewReader("ok")),
|
||||
Request: req,
|
||||
}, nil
|
||||
}),
|
||||
})
|
||||
|
||||
req, errNewRequest := http.NewRequestWithContext(context.Background(), http.MethodPost, "https://example.invalid/v1/chat/completions", strings.NewReader("{}"))
|
||||
if errNewRequest != nil {
|
||||
t.Fatalf("NewRequestWithContext() error = %v", errNewRequest)
|
||||
}
|
||||
resp, errDo := client.Do(req)
|
||||
if errDo != nil {
|
||||
t.Fatalf("Do() error = %v", errDo)
|
||||
}
|
||||
if _, errRead := io.ReadAll(resp.Body); errRead != nil {
|
||||
t.Fatalf("ReadAll() error = %v", errRead)
|
||||
}
|
||||
if errClose := resp.Body.Close(); errClose != nil {
|
||||
t.Fatalf("response body close error = %v", errClose)
|
||||
}
|
||||
if got := reporter.ttftDuration(); got < delay {
|
||||
t.Fatalf("ttft = %v, want >= %v", got, delay)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUsageReporterBuildRecordIncludesRequestedModelAlias(t *testing.T) {
|
||||
ctx := usage.WithRequestedModelAlias(context.Background(), "client-gpt")
|
||||
reporter := NewUsageReporter(ctx, "openai", "gpt-5.4", nil)
|
||||
@@ -186,3 +224,9 @@ func TestUsageReporterBuildAdditionalModelRecordSkipsZeroTokens(t *testing.T) {
|
||||
t.Fatalf("expected non-zero cached token usage to be recorded")
|
||||
}
|
||||
}
|
||||
|
||||
type roundTripFunc func(*http.Request) (*http.Response, error)
|
||||
|
||||
func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
return f(req)
|
||||
}
|
||||
|
||||
@@ -146,6 +146,7 @@ func (e *KimiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
|
||||
})
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
@@ -257,6 +258,7 @@ func (e *KimiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
|
||||
})
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
|
||||
@@ -162,6 +162,7 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
|
||||
})
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
@@ -252,6 +253,7 @@ func (e *OpenAICompatExecutor) executeImages(ctx context.Context, auth *cliproxy
|
||||
})
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
@@ -360,6 +362,7 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
|
||||
})
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
@@ -510,6 +513,7 @@ func (e *OpenAICompatExecutor) executeImagesStream(ctx context.Context, auth *cl
|
||||
})
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
|
||||
@@ -127,6 +127,7 @@ func (e *XAIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
|
||||
e.recordXAIRequest(ctx, auth, url, httpReq.Header.Clone(), prepared.body)
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
@@ -314,6 +315,7 @@ func (e *XAIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth
|
||||
e.recordXAIRequest(ctx, auth, url, httpReq.Header.Clone(), prepared.body)
|
||||
|
||||
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
httpClient = reporter.TrackHTTPClient(httpClient)
|
||||
httpResp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
helps.RecordAPIResponseError(ctx, e.cfg, err)
|
||||
|
||||
@@ -24,6 +24,7 @@ type Record struct {
|
||||
ReasoningEffort string
|
||||
RequestedAt time.Time
|
||||
Latency time.Duration
|
||||
TTFT time.Duration
|
||||
Failed bool
|
||||
Fail Failure
|
||||
Detail Detail
|
||||
|
||||
Reference in New Issue
Block a user