package helps import ( "bytes" "context" "errors" "fmt" "io" "net/http" "reflect" "strings" "sync" "time" "github.com/gin-gonic/gin" internallogging "github.com/router-for-me/CLIProxyAPI/v7/internal/logging" "github.com/router-for-me/CLIProxyAPI/v7/internal/thinking" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth" "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/usage" "github.com/tidwall/gjson" "github.com/tidwall/sjson" ) type UsageReporter struct { provider string executorType string model string alias string authID string authIndex string authType string apiKey string source string reasoning string serviceTier string requestedAt time.Time ttftMu sync.RWMutex ttft time.Duration ttftStart time.Time ttftSet bool once sync.Once } type usageExecutor interface { Identifier() string } func NewExecutorUsageReporter(ctx context.Context, executor usageExecutor, model string, auth *cliproxyauth.Auth) *UsageReporter { provider := "" if executor != nil { provider = executor.Identifier() } reporter := NewUsageReporter(ctx, provider, model, auth) reporter.executorType = ExecutorTypeName(executor) return reporter } func NewUsageReporter(ctx context.Context, provider, model string, auth *cliproxyauth.Auth) *UsageReporter { apiKey := APIKeyFromContext(ctx) alias := usage.RequestedModelAliasFromContext(ctx) if alias == "" { alias = model } reporter := &UsageReporter{ provider: provider, model: model, alias: strings.TrimSpace(alias), requestedAt: time.Now(), apiKey: apiKey, source: resolveUsageSource(auth, apiKey), authType: resolveUsageAuthType(auth), reasoning: usage.ReasoningEffortFromContext(ctx), serviceTier: usage.ServiceTierFromContext(ctx), } if auth != nil { reporter.authID = auth.ID reporter.authIndex = auth.EnsureIndex() } return reporter } func ExecutorTypeName(executor any) string { if executor == nil { return "" } executorType := reflect.TypeOf(executor) for executorType.Kind() == reflect.Pointer { executorType = executorType.Elem() } return strings.TrimSpace(executorType.Name()) } func (r *UsageReporter) Publish(ctx context.Context, detail usage.Detail) { r.publishWithOutcome(ctx, detail, false, usage.Failure{}) } func (r *UsageReporter) PublishAdditionalModel(ctx context.Context, model string, detail usage.Detail) { record, ok := r.buildAdditionalModelRecord(model, detail) if !ok { return } r.publishRecord(ctx, record) } func (r *UsageReporter) SetTranslatedReasoningEffort(payload []byte, format string) { if r == nil { return } r.reasoning = thinking.ExtractTranslatedReasoningEffort(payload, format) r.serviceTier = extractServiceTierFromPayload(payload) } func (r *UsageReporter) TrackHTTPClient(client *http.Client) *http.Client { if r == nil || client == nil { return client } tracked := *client transport := tracked.Transport if transport == nil { transport = http.DefaultTransport } tracked.Transport = usageTTFTRoundTripper{ base: transport, reporter: r, } return &tracked } func (r *UsageReporter) ObserveResponse(resp *http.Response) { if r == nil || resp == nil || resp.Body == nil { return } r.StartResponseTTFT() resp.Body = &usageTTFTReadCloser{ ReadCloser: resp.Body, mark: func() { r.MarkFirstResponseByte() }, } } func (r *UsageReporter) StartResponseTTFT() { if r == nil { return } r.ttftMu.Lock() if !r.ttftSet && r.ttftStart.IsZero() { r.ttftStart = time.Now() } r.ttftMu.Unlock() } func (r *UsageReporter) MarkFirstResponseByte() { if r == nil { return } r.ttftMu.Lock() if r.ttftSet { r.ttftMu.Unlock() return } start := r.ttftStart r.ttftStart = time.Time{} r.ttftMu.Unlock() if start.IsZero() { return } r.setTTFT(time.Since(start)) } func (r *UsageReporter) buildAdditionalModelRecord(model string, detail usage.Detail) (usage.Record, bool) { if r == nil { return usage.Record{}, false } model = strings.TrimSpace(model) if model == "" { return usage.Record{}, false } detail = normalizeUsageDetailTotal(detail) if !hasNonZeroTokenUsage(detail) { return usage.Record{}, false } return r.buildRecordForModel(model, detail, false, usage.Failure{}), true } func (r *UsageReporter) PublishFailure(ctx context.Context, errs ...error) { r.publishWithOutcome(ctx, usage.Detail{}, true, failFromErrors(errs...)) } func (r *UsageReporter) TrackFailure(ctx context.Context, errPtr *error) { if r == nil || errPtr == nil { return } if *errPtr != nil { r.PublishFailure(ctx, *errPtr) } } func (r *UsageReporter) publishWithOutcome(ctx context.Context, detail usage.Detail, failed bool, fail usage.Failure) { if r == nil { return } detail = normalizeUsageDetailTotal(detail) r.once.Do(func() { r.publishRecord(ctx, r.buildRecord(detail, failed, fail)) }) } func normalizeUsageDetailTotal(detail usage.Detail) usage.Detail { if detail.TotalTokens == 0 { total := detail.InputTokens + detail.OutputTokens + detail.ReasoningTokens if total > 0 { detail.TotalTokens = total } } return detail } func hasNonZeroTokenUsage(detail usage.Detail) bool { return detail.InputTokens != 0 || detail.OutputTokens != 0 || detail.ReasoningTokens != 0 || detail.CachedTokens != 0 || detail.CacheReadTokens != 0 || detail.CacheCreationTokens != 0 || detail.TotalTokens != 0 } // ensurePublished guarantees that a usage record is emitted exactly once. // It is safe to call multiple times; only the first call wins due to once.Do. // This is used to ensure request counting even when upstream responses do not // include any usage fields (tokens), especially for streaming paths. func (r *UsageReporter) EnsurePublished(ctx context.Context) { if r == nil { return } r.once.Do(func() { r.publishRecord(ctx, r.buildRecord(usage.Detail{}, false, usage.Failure{})) }) } func (r *UsageReporter) publishRecord(ctx context.Context, record usage.Record) { record.ResponseHeaders = internallogging.GetResponseHeaders(ctx) usage.PublishRecord(ctx, record) } func (r *UsageReporter) buildRecord(detail usage.Detail, failed bool, failures ...usage.Failure) usage.Record { var fail usage.Failure if len(failures) > 0 { fail = failures[0] } if r == nil { return usage.Record{Detail: detail, Failed: failed, Fail: fail} } return r.buildRecordForModel(r.model, detail, failed, fail) } func (r *UsageReporter) buildRecordForModel(model string, detail usage.Detail, failed bool, fail usage.Failure) usage.Record { if r == nil { return usage.Record{Model: model, Detail: detail, Failed: failed, Fail: fail} } return usage.Record{ Provider: r.provider, ExecutorType: r.executorType, Model: model, Alias: r.alias, Source: r.source, APIKey: r.apiKey, AuthID: r.authID, AuthIndex: r.authIndex, AuthType: r.authType, ReasoningEffort: r.reasoning, ServiceTier: r.serviceTier, RequestedAt: r.requestedAt, Latency: r.latency(), TTFT: r.ttftDuration(), Failed: failed, Fail: fail, Detail: detail, } } func extractServiceTierFromPayload(payload []byte) string { if len(payload) == 0 { return usage.DefaultServiceTier } for _, path := range []string{"service_tier", "request.service_tier", "response.service_tier"} { serviceTier := strings.TrimSpace(gjson.GetBytes(payload, path).String()) if serviceTier != "" { return serviceTier } } return usage.DefaultServiceTier } func failFromErrors(errs ...error) usage.Failure { for _, err := range errs { if err == nil { continue } fail := usage.Failure{ Body: strings.TrimSpace(err.Error()), } var se interface{ StatusCode() int } if errors.As(err, &se) && se != nil { fail.StatusCode = se.StatusCode() } return fail } return usage.Failure{} } func (r *UsageReporter) latency() time.Duration { if r == nil || r.requestedAt.IsZero() { return 0 } latency := time.Since(r.requestedAt) if latency < 0 { return 0 } return latency } func (r *UsageReporter) setTTFT(ttft time.Duration) { if r == nil { return } if ttft < 0 { ttft = 0 } r.ttftMu.Lock() if r.ttftSet { r.ttftMu.Unlock() return } r.ttft = ttft r.ttftSet = true r.ttftStart = time.Time{} r.ttftMu.Unlock() } func (r *UsageReporter) ttftDuration() time.Duration { if r == nil { return 0 } r.ttftMu.RLock() defer r.ttftMu.RUnlock() return r.ttft } type usageTTFTRoundTripper struct { base http.RoundTripper reporter *UsageReporter } func (t usageTTFTRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { t.reporter.StartResponseTTFT() resp, errRoundTrip := t.base.RoundTrip(req) if errRoundTrip != nil { return resp, errRoundTrip } t.reporter.ObserveResponse(resp) return resp, nil } type usageTTFTReadCloser struct { io.ReadCloser once sync.Once mark func() } func (r *usageTTFTReadCloser) Read(p []byte) (int, error) { if r == nil || r.ReadCloser == nil { return 0, io.ErrClosedPipe } n, errRead := r.ReadCloser.Read(p) if n > 0 && r.mark != nil { r.once.Do(r.mark) } return n, errRead } func APIKeyFromContext(ctx context.Context) string { if ctx == nil { return "" } ginCtx, ok := ctx.Value("gin").(*gin.Context) if !ok || ginCtx == nil { return "" } if v, exists := ginCtx.Get("userApiKey"); exists { switch value := v.(type) { case string: return value case fmt.Stringer: return value.String() default: return fmt.Sprintf("%v", value) } } return "" } func resolveUsageSource(auth *cliproxyauth.Auth, ctxAPIKey string) string { if auth != nil { provider := strings.TrimSpace(auth.Provider) if strings.EqualFold(provider, "gemini-cli") { if id := strings.TrimSpace(auth.ID); id != "" { return id } } if strings.EqualFold(provider, "vertex") { if auth.Metadata != nil { if projectID, ok := auth.Metadata["project_id"].(string); ok { if trimmed := strings.TrimSpace(projectID); trimmed != "" { return trimmed } } if project, ok := auth.Metadata["project"].(string); ok { if trimmed := strings.TrimSpace(project); trimmed != "" { return trimmed } } } } if _, value := auth.AccountInfo(); value != "" { return strings.TrimSpace(value) } if auth.Metadata != nil { if email, ok := auth.Metadata["email"].(string); ok { if trimmed := strings.TrimSpace(email); trimmed != "" { return trimmed } } } if auth.Attributes != nil { if key := strings.TrimSpace(auth.Attributes["api_key"]); key != "" { return key } } } if trimmed := strings.TrimSpace(ctxAPIKey); trimmed != "" { return trimmed } return "" } func resolveUsageAuthType(auth *cliproxyauth.Auth) string { if auth == nil { return "" } kind, _ := auth.AccountInfo() kind = strings.TrimSpace(kind) if kind == "api_key" { return "apikey" } return kind } func ParseCodexUsage(data []byte) (usage.Detail, bool) { usageNode := gjson.ParseBytes(data).Get("response.usage") if !hasOpenAIStyleUsageTokenFields(usageNode) { return usage.Detail{}, false } return parseOpenAIStyleUsageNode(usageNode), true } func ParseCodexImageToolUsage(data []byte) (usage.Detail, bool) { usageNode := gjson.ParseBytes(data).Get("response.tool_usage.image_gen") if !hasOpenAIStyleUsageTokenFields(usageNode) { return usage.Detail{}, false } return parseOpenAIStyleUsageNode(usageNode), true } func ParseOpenAIUsage(data []byte) usage.Detail { usageNode := gjson.ParseBytes(data).Get("usage") if !hasOpenAIStyleUsageTokenFields(usageNode) { return usage.Detail{} } return parseOpenAIStyleUsageNode(usageNode) } func hasOpenAIStyleUsageTokenFields(usageNode gjson.Result) bool { if !usageNode.Exists() || !usageNode.IsObject() { return false } return usageNode.Get("prompt_tokens").Exists() || usageNode.Get("input_tokens").Exists() || usageNode.Get("completion_tokens").Exists() || usageNode.Get("output_tokens").Exists() || usageNode.Get("total_tokens").Exists() || usageNode.Get("prompt_tokens_details.cached_tokens").Exists() || usageNode.Get("input_tokens_details.cached_tokens").Exists() || usageNode.Get("completion_tokens_details.reasoning_tokens").Exists() || usageNode.Get("output_tokens_details.reasoning_tokens").Exists() } func parseOpenAIStyleUsageNode(usageNode gjson.Result) usage.Detail { inputNode := usageNode.Get("prompt_tokens") if !inputNode.Exists() { inputNode = usageNode.Get("input_tokens") } outputNode := usageNode.Get("completion_tokens") if !outputNode.Exists() { outputNode = usageNode.Get("output_tokens") } detail := usage.Detail{ InputTokens: inputNode.Int(), OutputTokens: outputNode.Int(), TotalTokens: usageNode.Get("total_tokens").Int(), } cached := usageNode.Get("prompt_tokens_details.cached_tokens") if !cached.Exists() { cached = usageNode.Get("input_tokens_details.cached_tokens") } if cached.Exists() { detail.CachedTokens = cached.Int() } reasoning := usageNode.Get("completion_tokens_details.reasoning_tokens") if !reasoning.Exists() { reasoning = usageNode.Get("output_tokens_details.reasoning_tokens") } if reasoning.Exists() { detail.ReasoningTokens = reasoning.Int() } return detail } func ParseOpenAIStreamUsage(line []byte) (usage.Detail, bool) { payload := jsonPayload(line) if len(payload) == 0 || !gjson.ValidBytes(payload) { return usage.Detail{}, false } usageNode := gjson.GetBytes(payload, "usage") if !hasOpenAIStyleUsageTokenFields(usageNode) { return usage.Detail{}, false } return parseOpenAIStyleUsageNode(usageNode), true } func ParseClaudeUsage(data []byte) usage.Detail { usageNode := gjson.ParseBytes(data).Get("usage") if !usageNode.Exists() { return usage.Detail{} } return parseClaudeUsageNode(usageNode) } func ParseClaudeStreamUsage(line []byte) (usage.Detail, bool) { payload := jsonPayload(line) if len(payload) == 0 || !gjson.ValidBytes(payload) { return usage.Detail{}, false } usageNode := gjson.GetBytes(payload, "usage") if !usageNode.Exists() { return usage.Detail{}, false } return parseClaudeUsageNode(usageNode), true } func parseClaudeUsageNode(usageNode gjson.Result) usage.Detail { cacheReadTokens := usageNode.Get("cache_read_input_tokens").Int() cacheCreationTokens := usageNode.Get("cache_creation_input_tokens").Int() detail := usage.Detail{ InputTokens: usageNode.Get("input_tokens").Int(), OutputTokens: usageNode.Get("output_tokens").Int(), CachedTokens: cacheReadTokens, CacheReadTokens: cacheReadTokens, CacheCreationTokens: cacheCreationTokens, } if detail.CachedTokens == 0 { detail.CachedTokens = detail.CacheCreationTokens } detail.TotalTokens = detail.InputTokens + detail.OutputTokens + detail.CacheReadTokens + detail.CacheCreationTokens return detail } func parseGeminiFamilyUsageDetail(node gjson.Result) usage.Detail { detail := usage.Detail{ InputTokens: node.Get("promptTokenCount").Int(), OutputTokens: node.Get("candidatesTokenCount").Int(), ReasoningTokens: node.Get("thoughtsTokenCount").Int(), TotalTokens: node.Get("totalTokenCount").Int(), CachedTokens: node.Get("cachedContentTokenCount").Int(), } if detail.TotalTokens == 0 { detail.TotalTokens = detail.InputTokens + detail.OutputTokens + detail.ReasoningTokens } return detail } func hasGeminiFamilyUsageTokenFields(node gjson.Result) bool { return node.Get("promptTokenCount").Exists() || node.Get("candidatesTokenCount").Exists() || node.Get("thoughtsTokenCount").Exists() || node.Get("totalTokenCount").Exists() || node.Get("cachedContentTokenCount").Exists() } func ParseGeminiCLIUsage(data []byte) usage.Detail { usageNode := gjson.ParseBytes(data) node := firstExistingUsageNode(usageNode, "response.usageMetadata", "response.usage_metadata", "usageMetadata", "usage_metadata", ) if !node.Exists() { return usage.Detail{} } return parseGeminiFamilyUsageDetail(node) } func ParseGeminiUsage(data []byte) usage.Detail { usageNode := gjson.ParseBytes(data) node := usageNode.Get("usageMetadata") if !node.Exists() { node = usageNode.Get("usage_metadata") } if !node.Exists() { return usage.Detail{} } return parseGeminiFamilyUsageDetail(node) } func ParseGeminiStreamUsage(line []byte) (usage.Detail, bool) { payload := jsonPayload(line) if len(payload) == 0 || !gjson.ValidBytes(payload) { return usage.Detail{}, false } node := gjson.GetBytes(payload, "usageMetadata") if !node.Exists() { node = gjson.GetBytes(payload, "usage_metadata") } if !node.Exists() { return usage.Detail{}, false } return parseGeminiFamilyUsageDetail(node), true } func ParseGeminiCLIStreamUsage(line []byte) (usage.Detail, bool) { payload := jsonPayload(line) if len(payload) == 0 || !gjson.ValidBytes(payload) { return usage.Detail{}, false } root := gjson.ParseBytes(payload) node := firstExistingUsageNode(root, "response.usageMetadata", "response.usage_metadata", "usageMetadata", "usage_metadata", ) if !node.Exists() { return usage.Detail{}, false } if !hasGeminiFamilyUsageTokenFields(node) { return usage.Detail{}, false } return parseGeminiFamilyUsageDetail(node), true } func firstExistingUsageNode(root gjson.Result, paths ...string) gjson.Result { for _, path := range paths { node := root.Get(path) if node.Exists() { return node } } return gjson.Result{} } func ParseAntigravityUsage(data []byte) usage.Detail { usageNode := gjson.ParseBytes(data) node := usageNode.Get("response.usageMetadata") if !node.Exists() { node = usageNode.Get("usageMetadata") } if !node.Exists() { node = usageNode.Get("usage_metadata") } if !node.Exists() { return usage.Detail{} } return parseGeminiFamilyUsageDetail(node) } func ParseAntigravityStreamUsage(line []byte) (usage.Detail, bool) { payload := jsonPayload(line) if len(payload) == 0 || !gjson.ValidBytes(payload) { return usage.Detail{}, false } node := gjson.GetBytes(payload, "response.usageMetadata") if !node.Exists() { node = gjson.GetBytes(payload, "usageMetadata") } if !node.Exists() { node = gjson.GetBytes(payload, "usage_metadata") } if !node.Exists() { return usage.Detail{}, false } return parseGeminiFamilyUsageDetail(node), true } var stopChunkWithoutUsage sync.Map func rememberStopWithoutUsage(traceID string) { stopChunkWithoutUsage.Store(traceID, struct{}{}) time.AfterFunc(10*time.Minute, func() { stopChunkWithoutUsage.Delete(traceID) }) } // FilterSSEUsageMetadata removes usageMetadata from SSE events that are not // terminal (finishReason != "stop"). Stop chunks are left untouched. This // function is shared between aistudio and antigravity executors. func FilterSSEUsageMetadata(payload []byte) []byte { if len(payload) == 0 { return payload } lines := bytes.Split(payload, []byte("\n")) modified := false foundData := false for idx, line := range lines { trimmed := bytes.TrimSpace(line) if len(trimmed) == 0 || !bytes.HasPrefix(trimmed, []byte("data:")) { continue } foundData = true dataIdx := bytes.Index(line, []byte("data:")) if dataIdx < 0 { continue } rawJSON := bytes.TrimSpace(line[dataIdx+5:]) traceID := gjson.GetBytes(rawJSON, "traceId").String() if isStopChunkWithoutUsage(rawJSON) && traceID != "" { rememberStopWithoutUsage(traceID) continue } if traceID != "" { if _, ok := stopChunkWithoutUsage.Load(traceID); ok && hasUsageMetadata(rawJSON) { stopChunkWithoutUsage.Delete(traceID) continue } } cleaned, changed := StripUsageMetadataFromJSON(rawJSON) if !changed { continue } var rebuilt []byte rebuilt = append(rebuilt, line[:dataIdx]...) rebuilt = append(rebuilt, []byte("data:")...) if len(cleaned) > 0 { rebuilt = append(rebuilt, ' ') rebuilt = append(rebuilt, cleaned...) } lines[idx] = rebuilt modified = true } if !modified { if !foundData { // Handle payloads that are raw JSON without SSE data: prefix. trimmed := bytes.TrimSpace(payload) cleaned, changed := StripUsageMetadataFromJSON(trimmed) if !changed { return payload } return cleaned } return payload } return bytes.Join(lines, []byte("\n")) } // StripUsageMetadataFromJSON drops usageMetadata unless finishReason is present (terminal). // It handles both formats: // - Aistudio: candidates.0.finishReason // - Antigravity: response.candidates.0.finishReason func StripUsageMetadataFromJSON(rawJSON []byte) ([]byte, bool) { jsonBytes := bytes.TrimSpace(rawJSON) if len(jsonBytes) == 0 || !gjson.ValidBytes(jsonBytes) { return rawJSON, false } // Check for finishReason in both aistudio and antigravity formats finishReason := gjson.GetBytes(jsonBytes, "candidates.0.finishReason") if !finishReason.Exists() { finishReason = gjson.GetBytes(jsonBytes, "response.candidates.0.finishReason") } terminalReason := finishReason.Exists() && strings.TrimSpace(finishReason.String()) != "" usageMetadata := gjson.GetBytes(jsonBytes, "usageMetadata") if !usageMetadata.Exists() { usageMetadata = gjson.GetBytes(jsonBytes, "response.usageMetadata") } // Terminal chunk: keep as-is. if terminalReason { return rawJSON, false } // Nothing to strip if !usageMetadata.Exists() { return rawJSON, false } // Remove usageMetadata from both possible locations cleaned := jsonBytes var changed bool if usageMetadata = gjson.GetBytes(cleaned, "usageMetadata"); usageMetadata.Exists() { // Rename usageMetadata to cpaUsageMetadata in the message_start event of Claude cleaned, _ = sjson.SetRawBytes(cleaned, "cpaUsageMetadata", []byte(usageMetadata.Raw)) cleaned, _ = sjson.DeleteBytes(cleaned, "usageMetadata") changed = true } if usageMetadata = gjson.GetBytes(cleaned, "response.usageMetadata"); usageMetadata.Exists() { // Rename usageMetadata to cpaUsageMetadata in the message_start event of Claude cleaned, _ = sjson.SetRawBytes(cleaned, "response.cpaUsageMetadata", []byte(usageMetadata.Raw)) cleaned, _ = sjson.DeleteBytes(cleaned, "response.usageMetadata") changed = true } return cleaned, changed } func hasUsageMetadata(jsonBytes []byte) bool { if len(jsonBytes) == 0 || !gjson.ValidBytes(jsonBytes) { return false } if gjson.GetBytes(jsonBytes, "usageMetadata").Exists() { return true } if gjson.GetBytes(jsonBytes, "response.usageMetadata").Exists() { return true } return false } func isStopChunkWithoutUsage(jsonBytes []byte) bool { if len(jsonBytes) == 0 || !gjson.ValidBytes(jsonBytes) { return false } finishReason := gjson.GetBytes(jsonBytes, "candidates.0.finishReason") if !finishReason.Exists() { finishReason = gjson.GetBytes(jsonBytes, "response.candidates.0.finishReason") } trimmed := strings.TrimSpace(finishReason.String()) if !finishReason.Exists() || trimmed == "" { return false } return !hasUsageMetadata(jsonBytes) } func JSONPayload(line []byte) []byte { return jsonPayload(line) } func jsonPayload(line []byte) []byte { trimmed := bytes.TrimSpace(line) if len(trimmed) == 0 { return nil } if bytes.Equal(trimmed, []byte("[DONE]")) { return nil } if bytes.HasPrefix(trimmed, []byte("event:")) { return nil } if bytes.HasPrefix(trimmed, []byte("data:")) { trimmed = bytes.TrimSpace(trimmed[len("data:"):]) } if len(trimmed) == 0 || trimmed[0] != '{' { return nil } return trimmed }