Files
CLIProxyAPI/internal/runtime/executor/helps/usage_helpers.go
Luis Pater 959067edfb feat(usage): introduce executor type tracking in usage reporting
- Replaced `NewUsageReporter` with `NewExecutorUsageReporter` to include executor type in usage records.
- Updated all executors to use the new reporter implementation.
- Extended `UsageReporter` to track and publish executor type.
- Added tests to validate proper executor type recording and handling.
- Enhanced RedisQueue plugin and payload schema with executor type support.
2026-06-02 00:43:16 +08:00

880 lines
23 KiB
Go

package helps
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"reflect"
"strings"
"sync"
"time"
"github.com/gin-gonic/gin"
internallogging "github.com/router-for-me/CLIProxyAPI/v7/internal/logging"
"github.com/router-for-me/CLIProxyAPI/v7/internal/thinking"
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth"
"github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/usage"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)
type UsageReporter struct {
provider string
executorType string
model string
alias string
authID string
authIndex string
authType string
apiKey string
source string
reasoning string
serviceTier string
requestedAt time.Time
ttftMu sync.RWMutex
ttft time.Duration
ttftStart time.Time
ttftSet bool
once sync.Once
}
type usageExecutor interface {
Identifier() string
}
func NewExecutorUsageReporter(ctx context.Context, executor usageExecutor, model string, auth *cliproxyauth.Auth) *UsageReporter {
provider := ""
if executor != nil {
provider = executor.Identifier()
}
reporter := NewUsageReporter(ctx, provider, model, auth)
reporter.executorType = ExecutorTypeName(executor)
return reporter
}
func NewUsageReporter(ctx context.Context, provider, model string, auth *cliproxyauth.Auth) *UsageReporter {
apiKey := APIKeyFromContext(ctx)
alias := usage.RequestedModelAliasFromContext(ctx)
if alias == "" {
alias = model
}
reporter := &UsageReporter{
provider: provider,
model: model,
alias: strings.TrimSpace(alias),
requestedAt: time.Now(),
apiKey: apiKey,
source: resolveUsageSource(auth, apiKey),
authType: resolveUsageAuthType(auth),
reasoning: usage.ReasoningEffortFromContext(ctx),
serviceTier: usage.ServiceTierFromContext(ctx),
}
if auth != nil {
reporter.authID = auth.ID
reporter.authIndex = auth.EnsureIndex()
}
return reporter
}
func ExecutorTypeName(executor any) string {
if executor == nil {
return ""
}
executorType := reflect.TypeOf(executor)
for executorType.Kind() == reflect.Pointer {
executorType = executorType.Elem()
}
return strings.TrimSpace(executorType.Name())
}
func (r *UsageReporter) Publish(ctx context.Context, detail usage.Detail) {
r.publishWithOutcome(ctx, detail, false, usage.Failure{})
}
func (r *UsageReporter) PublishAdditionalModel(ctx context.Context, model string, detail usage.Detail) {
record, ok := r.buildAdditionalModelRecord(model, detail)
if !ok {
return
}
r.publishRecord(ctx, record)
}
func (r *UsageReporter) SetTranslatedReasoningEffort(payload []byte, format string) {
if r == nil {
return
}
r.reasoning = thinking.ExtractTranslatedReasoningEffort(payload, format)
r.serviceTier = extractServiceTierFromPayload(payload)
}
func (r *UsageReporter) TrackHTTPClient(client *http.Client) *http.Client {
if r == nil || client == nil {
return client
}
tracked := *client
transport := tracked.Transport
if transport == nil {
transport = http.DefaultTransport
}
tracked.Transport = usageTTFTRoundTripper{
base: transport,
reporter: r,
}
return &tracked
}
func (r *UsageReporter) ObserveResponse(resp *http.Response) {
if r == nil || resp == nil || resp.Body == nil {
return
}
r.StartResponseTTFT()
resp.Body = &usageTTFTReadCloser{
ReadCloser: resp.Body,
mark: func() {
r.MarkFirstResponseByte()
},
}
}
func (r *UsageReporter) StartResponseTTFT() {
if r == nil {
return
}
r.ttftMu.Lock()
if !r.ttftSet && r.ttftStart.IsZero() {
r.ttftStart = time.Now()
}
r.ttftMu.Unlock()
}
func (r *UsageReporter) MarkFirstResponseByte() {
if r == nil {
return
}
r.ttftMu.Lock()
if r.ttftSet {
r.ttftMu.Unlock()
return
}
start := r.ttftStart
r.ttftStart = time.Time{}
r.ttftMu.Unlock()
if start.IsZero() {
return
}
r.setTTFT(time.Since(start))
}
func (r *UsageReporter) buildAdditionalModelRecord(model string, detail usage.Detail) (usage.Record, bool) {
if r == nil {
return usage.Record{}, false
}
model = strings.TrimSpace(model)
if model == "" {
return usage.Record{}, false
}
detail = normalizeUsageDetailTotal(detail)
if !hasNonZeroTokenUsage(detail) {
return usage.Record{}, false
}
return r.buildRecordForModel(model, detail, false, usage.Failure{}), true
}
func (r *UsageReporter) PublishFailure(ctx context.Context, errs ...error) {
r.publishWithOutcome(ctx, usage.Detail{}, true, failFromErrors(errs...))
}
func (r *UsageReporter) TrackFailure(ctx context.Context, errPtr *error) {
if r == nil || errPtr == nil {
return
}
if *errPtr != nil {
r.PublishFailure(ctx, *errPtr)
}
}
func (r *UsageReporter) publishWithOutcome(ctx context.Context, detail usage.Detail, failed bool, fail usage.Failure) {
if r == nil {
return
}
detail = normalizeUsageDetailTotal(detail)
r.once.Do(func() {
r.publishRecord(ctx, r.buildRecord(detail, failed, fail))
})
}
func normalizeUsageDetailTotal(detail usage.Detail) usage.Detail {
if detail.TotalTokens == 0 {
total := detail.InputTokens + detail.OutputTokens + detail.ReasoningTokens
if total > 0 {
detail.TotalTokens = total
}
}
return detail
}
func hasNonZeroTokenUsage(detail usage.Detail) bool {
return detail.InputTokens != 0 ||
detail.OutputTokens != 0 ||
detail.ReasoningTokens != 0 ||
detail.CachedTokens != 0 ||
detail.CacheReadTokens != 0 ||
detail.CacheCreationTokens != 0 ||
detail.TotalTokens != 0
}
// ensurePublished guarantees that a usage record is emitted exactly once.
// It is safe to call multiple times; only the first call wins due to once.Do.
// This is used to ensure request counting even when upstream responses do not
// include any usage fields (tokens), especially for streaming paths.
func (r *UsageReporter) EnsurePublished(ctx context.Context) {
if r == nil {
return
}
r.once.Do(func() {
r.publishRecord(ctx, r.buildRecord(usage.Detail{}, false, usage.Failure{}))
})
}
func (r *UsageReporter) publishRecord(ctx context.Context, record usage.Record) {
record.ResponseHeaders = internallogging.GetResponseHeaders(ctx)
usage.PublishRecord(ctx, record)
}
func (r *UsageReporter) buildRecord(detail usage.Detail, failed bool, failures ...usage.Failure) usage.Record {
var fail usage.Failure
if len(failures) > 0 {
fail = failures[0]
}
if r == nil {
return usage.Record{Detail: detail, Failed: failed, Fail: fail}
}
return r.buildRecordForModel(r.model, detail, failed, fail)
}
func (r *UsageReporter) buildRecordForModel(model string, detail usage.Detail, failed bool, fail usage.Failure) usage.Record {
if r == nil {
return usage.Record{Model: model, Detail: detail, Failed: failed, Fail: fail}
}
return usage.Record{
Provider: r.provider,
ExecutorType: r.executorType,
Model: model,
Alias: r.alias,
Source: r.source,
APIKey: r.apiKey,
AuthID: r.authID,
AuthIndex: r.authIndex,
AuthType: r.authType,
ReasoningEffort: r.reasoning,
ServiceTier: r.serviceTier,
RequestedAt: r.requestedAt,
Latency: r.latency(),
TTFT: r.ttftDuration(),
Failed: failed,
Fail: fail,
Detail: detail,
}
}
func extractServiceTierFromPayload(payload []byte) string {
if len(payload) == 0 {
return usage.DefaultServiceTier
}
for _, path := range []string{"service_tier", "request.service_tier", "response.service_tier"} {
serviceTier := strings.TrimSpace(gjson.GetBytes(payload, path).String())
if serviceTier != "" {
return serviceTier
}
}
return usage.DefaultServiceTier
}
func failFromErrors(errs ...error) usage.Failure {
for _, err := range errs {
if err == nil {
continue
}
fail := usage.Failure{
Body: strings.TrimSpace(err.Error()),
}
var se interface{ StatusCode() int }
if errors.As(err, &se) && se != nil {
fail.StatusCode = se.StatusCode()
}
return fail
}
return usage.Failure{}
}
func (r *UsageReporter) latency() time.Duration {
if r == nil || r.requestedAt.IsZero() {
return 0
}
latency := time.Since(r.requestedAt)
if latency < 0 {
return 0
}
return latency
}
func (r *UsageReporter) setTTFT(ttft time.Duration) {
if r == nil {
return
}
if ttft < 0 {
ttft = 0
}
r.ttftMu.Lock()
if r.ttftSet {
r.ttftMu.Unlock()
return
}
r.ttft = ttft
r.ttftSet = true
r.ttftStart = time.Time{}
r.ttftMu.Unlock()
}
func (r *UsageReporter) ttftDuration() time.Duration {
if r == nil {
return 0
}
r.ttftMu.RLock()
defer r.ttftMu.RUnlock()
return r.ttft
}
type usageTTFTRoundTripper struct {
base http.RoundTripper
reporter *UsageReporter
}
func (t usageTTFTRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
t.reporter.StartResponseTTFT()
resp, errRoundTrip := t.base.RoundTrip(req)
if errRoundTrip != nil {
return resp, errRoundTrip
}
t.reporter.ObserveResponse(resp)
return resp, nil
}
type usageTTFTReadCloser struct {
io.ReadCloser
once sync.Once
mark func()
}
func (r *usageTTFTReadCloser) Read(p []byte) (int, error) {
if r == nil || r.ReadCloser == nil {
return 0, io.ErrClosedPipe
}
n, errRead := r.ReadCloser.Read(p)
if n > 0 && r.mark != nil {
r.once.Do(r.mark)
}
return n, errRead
}
func APIKeyFromContext(ctx context.Context) string {
if ctx == nil {
return ""
}
ginCtx, ok := ctx.Value("gin").(*gin.Context)
if !ok || ginCtx == nil {
return ""
}
if v, exists := ginCtx.Get("userApiKey"); exists {
switch value := v.(type) {
case string:
return value
case fmt.Stringer:
return value.String()
default:
return fmt.Sprintf("%v", value)
}
}
return ""
}
func resolveUsageSource(auth *cliproxyauth.Auth, ctxAPIKey string) string {
if auth != nil {
provider := strings.TrimSpace(auth.Provider)
if strings.EqualFold(provider, "gemini-cli") {
if id := strings.TrimSpace(auth.ID); id != "" {
return id
}
}
if strings.EqualFold(provider, "vertex") {
if auth.Metadata != nil {
if projectID, ok := auth.Metadata["project_id"].(string); ok {
if trimmed := strings.TrimSpace(projectID); trimmed != "" {
return trimmed
}
}
if project, ok := auth.Metadata["project"].(string); ok {
if trimmed := strings.TrimSpace(project); trimmed != "" {
return trimmed
}
}
}
}
if _, value := auth.AccountInfo(); value != "" {
return strings.TrimSpace(value)
}
if auth.Metadata != nil {
if email, ok := auth.Metadata["email"].(string); ok {
if trimmed := strings.TrimSpace(email); trimmed != "" {
return trimmed
}
}
}
if auth.Attributes != nil {
if key := strings.TrimSpace(auth.Attributes["api_key"]); key != "" {
return key
}
}
}
if trimmed := strings.TrimSpace(ctxAPIKey); trimmed != "" {
return trimmed
}
return ""
}
func resolveUsageAuthType(auth *cliproxyauth.Auth) string {
if auth == nil {
return ""
}
kind, _ := auth.AccountInfo()
kind = strings.TrimSpace(kind)
if kind == "api_key" {
return "apikey"
}
return kind
}
func ParseCodexUsage(data []byte) (usage.Detail, bool) {
usageNode := gjson.ParseBytes(data).Get("response.usage")
if !hasOpenAIStyleUsageTokenFields(usageNode) {
return usage.Detail{}, false
}
return parseOpenAIStyleUsageNode(usageNode), true
}
func ParseCodexImageToolUsage(data []byte) (usage.Detail, bool) {
usageNode := gjson.ParseBytes(data).Get("response.tool_usage.image_gen")
if !hasOpenAIStyleUsageTokenFields(usageNode) {
return usage.Detail{}, false
}
return parseOpenAIStyleUsageNode(usageNode), true
}
func ParseOpenAIUsage(data []byte) usage.Detail {
usageNode := gjson.ParseBytes(data).Get("usage")
if !hasOpenAIStyleUsageTokenFields(usageNode) {
return usage.Detail{}
}
return parseOpenAIStyleUsageNode(usageNode)
}
func hasOpenAIStyleUsageTokenFields(usageNode gjson.Result) bool {
if !usageNode.Exists() || !usageNode.IsObject() {
return false
}
return usageNode.Get("prompt_tokens").Exists() ||
usageNode.Get("input_tokens").Exists() ||
usageNode.Get("completion_tokens").Exists() ||
usageNode.Get("output_tokens").Exists() ||
usageNode.Get("total_tokens").Exists() ||
usageNode.Get("prompt_tokens_details.cached_tokens").Exists() ||
usageNode.Get("input_tokens_details.cached_tokens").Exists() ||
usageNode.Get("completion_tokens_details.reasoning_tokens").Exists() ||
usageNode.Get("output_tokens_details.reasoning_tokens").Exists()
}
func parseOpenAIStyleUsageNode(usageNode gjson.Result) usage.Detail {
inputNode := usageNode.Get("prompt_tokens")
if !inputNode.Exists() {
inputNode = usageNode.Get("input_tokens")
}
outputNode := usageNode.Get("completion_tokens")
if !outputNode.Exists() {
outputNode = usageNode.Get("output_tokens")
}
detail := usage.Detail{
InputTokens: inputNode.Int(),
OutputTokens: outputNode.Int(),
TotalTokens: usageNode.Get("total_tokens").Int(),
}
cached := usageNode.Get("prompt_tokens_details.cached_tokens")
if !cached.Exists() {
cached = usageNode.Get("input_tokens_details.cached_tokens")
}
if cached.Exists() {
detail.CachedTokens = cached.Int()
}
reasoning := usageNode.Get("completion_tokens_details.reasoning_tokens")
if !reasoning.Exists() {
reasoning = usageNode.Get("output_tokens_details.reasoning_tokens")
}
if reasoning.Exists() {
detail.ReasoningTokens = reasoning.Int()
}
return detail
}
func ParseOpenAIStreamUsage(line []byte) (usage.Detail, bool) {
payload := jsonPayload(line)
if len(payload) == 0 || !gjson.ValidBytes(payload) {
return usage.Detail{}, false
}
usageNode := gjson.GetBytes(payload, "usage")
if !hasOpenAIStyleUsageTokenFields(usageNode) {
return usage.Detail{}, false
}
return parseOpenAIStyleUsageNode(usageNode), true
}
func ParseClaudeUsage(data []byte) usage.Detail {
usageNode := gjson.ParseBytes(data).Get("usage")
if !usageNode.Exists() {
return usage.Detail{}
}
return parseClaudeUsageNode(usageNode)
}
func ParseClaudeStreamUsage(line []byte) (usage.Detail, bool) {
payload := jsonPayload(line)
if len(payload) == 0 || !gjson.ValidBytes(payload) {
return usage.Detail{}, false
}
usageNode := gjson.GetBytes(payload, "usage")
if !usageNode.Exists() {
return usage.Detail{}, false
}
return parseClaudeUsageNode(usageNode), true
}
func parseClaudeUsageNode(usageNode gjson.Result) usage.Detail {
cacheReadTokens := usageNode.Get("cache_read_input_tokens").Int()
cacheCreationTokens := usageNode.Get("cache_creation_input_tokens").Int()
detail := usage.Detail{
InputTokens: usageNode.Get("input_tokens").Int(),
OutputTokens: usageNode.Get("output_tokens").Int(),
CachedTokens: cacheReadTokens,
CacheReadTokens: cacheReadTokens,
CacheCreationTokens: cacheCreationTokens,
}
if detail.CachedTokens == 0 {
detail.CachedTokens = detail.CacheCreationTokens
}
detail.TotalTokens = detail.InputTokens + detail.OutputTokens + detail.CacheReadTokens + detail.CacheCreationTokens
return detail
}
func parseGeminiFamilyUsageDetail(node gjson.Result) usage.Detail {
detail := usage.Detail{
InputTokens: node.Get("promptTokenCount").Int(),
OutputTokens: node.Get("candidatesTokenCount").Int(),
ReasoningTokens: node.Get("thoughtsTokenCount").Int(),
TotalTokens: node.Get("totalTokenCount").Int(),
CachedTokens: node.Get("cachedContentTokenCount").Int(),
}
if detail.TotalTokens == 0 {
detail.TotalTokens = detail.InputTokens + detail.OutputTokens + detail.ReasoningTokens
}
return detail
}
func hasGeminiFamilyUsageTokenFields(node gjson.Result) bool {
return node.Get("promptTokenCount").Exists() ||
node.Get("candidatesTokenCount").Exists() ||
node.Get("thoughtsTokenCount").Exists() ||
node.Get("totalTokenCount").Exists() ||
node.Get("cachedContentTokenCount").Exists()
}
func ParseGeminiCLIUsage(data []byte) usage.Detail {
usageNode := gjson.ParseBytes(data)
node := firstExistingUsageNode(usageNode,
"response.usageMetadata",
"response.usage_metadata",
"usageMetadata",
"usage_metadata",
)
if !node.Exists() {
return usage.Detail{}
}
return parseGeminiFamilyUsageDetail(node)
}
func ParseGeminiUsage(data []byte) usage.Detail {
usageNode := gjson.ParseBytes(data)
node := usageNode.Get("usageMetadata")
if !node.Exists() {
node = usageNode.Get("usage_metadata")
}
if !node.Exists() {
return usage.Detail{}
}
return parseGeminiFamilyUsageDetail(node)
}
func ParseGeminiStreamUsage(line []byte) (usage.Detail, bool) {
payload := jsonPayload(line)
if len(payload) == 0 || !gjson.ValidBytes(payload) {
return usage.Detail{}, false
}
node := gjson.GetBytes(payload, "usageMetadata")
if !node.Exists() {
node = gjson.GetBytes(payload, "usage_metadata")
}
if !node.Exists() {
return usage.Detail{}, false
}
return parseGeminiFamilyUsageDetail(node), true
}
func ParseGeminiCLIStreamUsage(line []byte) (usage.Detail, bool) {
payload := jsonPayload(line)
if len(payload) == 0 || !gjson.ValidBytes(payload) {
return usage.Detail{}, false
}
root := gjson.ParseBytes(payload)
node := firstExistingUsageNode(root,
"response.usageMetadata",
"response.usage_metadata",
"usageMetadata",
"usage_metadata",
)
if !node.Exists() {
return usage.Detail{}, false
}
if !hasGeminiFamilyUsageTokenFields(node) {
return usage.Detail{}, false
}
return parseGeminiFamilyUsageDetail(node), true
}
func firstExistingUsageNode(root gjson.Result, paths ...string) gjson.Result {
for _, path := range paths {
node := root.Get(path)
if node.Exists() {
return node
}
}
return gjson.Result{}
}
func ParseAntigravityUsage(data []byte) usage.Detail {
usageNode := gjson.ParseBytes(data)
node := usageNode.Get("response.usageMetadata")
if !node.Exists() {
node = usageNode.Get("usageMetadata")
}
if !node.Exists() {
node = usageNode.Get("usage_metadata")
}
if !node.Exists() {
return usage.Detail{}
}
return parseGeminiFamilyUsageDetail(node)
}
func ParseAntigravityStreamUsage(line []byte) (usage.Detail, bool) {
payload := jsonPayload(line)
if len(payload) == 0 || !gjson.ValidBytes(payload) {
return usage.Detail{}, false
}
node := gjson.GetBytes(payload, "response.usageMetadata")
if !node.Exists() {
node = gjson.GetBytes(payload, "usageMetadata")
}
if !node.Exists() {
node = gjson.GetBytes(payload, "usage_metadata")
}
if !node.Exists() {
return usage.Detail{}, false
}
return parseGeminiFamilyUsageDetail(node), true
}
var stopChunkWithoutUsage sync.Map
func rememberStopWithoutUsage(traceID string) {
stopChunkWithoutUsage.Store(traceID, struct{}{})
time.AfterFunc(10*time.Minute, func() { stopChunkWithoutUsage.Delete(traceID) })
}
// FilterSSEUsageMetadata removes usageMetadata from SSE events that are not
// terminal (finishReason != "stop"). Stop chunks are left untouched. This
// function is shared between aistudio and antigravity executors.
func FilterSSEUsageMetadata(payload []byte) []byte {
if len(payload) == 0 {
return payload
}
lines := bytes.Split(payload, []byte("\n"))
modified := false
foundData := false
for idx, line := range lines {
trimmed := bytes.TrimSpace(line)
if len(trimmed) == 0 || !bytes.HasPrefix(trimmed, []byte("data:")) {
continue
}
foundData = true
dataIdx := bytes.Index(line, []byte("data:"))
if dataIdx < 0 {
continue
}
rawJSON := bytes.TrimSpace(line[dataIdx+5:])
traceID := gjson.GetBytes(rawJSON, "traceId").String()
if isStopChunkWithoutUsage(rawJSON) && traceID != "" {
rememberStopWithoutUsage(traceID)
continue
}
if traceID != "" {
if _, ok := stopChunkWithoutUsage.Load(traceID); ok && hasUsageMetadata(rawJSON) {
stopChunkWithoutUsage.Delete(traceID)
continue
}
}
cleaned, changed := StripUsageMetadataFromJSON(rawJSON)
if !changed {
continue
}
var rebuilt []byte
rebuilt = append(rebuilt, line[:dataIdx]...)
rebuilt = append(rebuilt, []byte("data:")...)
if len(cleaned) > 0 {
rebuilt = append(rebuilt, ' ')
rebuilt = append(rebuilt, cleaned...)
}
lines[idx] = rebuilt
modified = true
}
if !modified {
if !foundData {
// Handle payloads that are raw JSON without SSE data: prefix.
trimmed := bytes.TrimSpace(payload)
cleaned, changed := StripUsageMetadataFromJSON(trimmed)
if !changed {
return payload
}
return cleaned
}
return payload
}
return bytes.Join(lines, []byte("\n"))
}
// StripUsageMetadataFromJSON drops usageMetadata unless finishReason is present (terminal).
// It handles both formats:
// - Aistudio: candidates.0.finishReason
// - Antigravity: response.candidates.0.finishReason
func StripUsageMetadataFromJSON(rawJSON []byte) ([]byte, bool) {
jsonBytes := bytes.TrimSpace(rawJSON)
if len(jsonBytes) == 0 || !gjson.ValidBytes(jsonBytes) {
return rawJSON, false
}
// Check for finishReason in both aistudio and antigravity formats
finishReason := gjson.GetBytes(jsonBytes, "candidates.0.finishReason")
if !finishReason.Exists() {
finishReason = gjson.GetBytes(jsonBytes, "response.candidates.0.finishReason")
}
terminalReason := finishReason.Exists() && strings.TrimSpace(finishReason.String()) != ""
usageMetadata := gjson.GetBytes(jsonBytes, "usageMetadata")
if !usageMetadata.Exists() {
usageMetadata = gjson.GetBytes(jsonBytes, "response.usageMetadata")
}
// Terminal chunk: keep as-is.
if terminalReason {
return rawJSON, false
}
// Nothing to strip
if !usageMetadata.Exists() {
return rawJSON, false
}
// Remove usageMetadata from both possible locations
cleaned := jsonBytes
var changed bool
if usageMetadata = gjson.GetBytes(cleaned, "usageMetadata"); usageMetadata.Exists() {
// Rename usageMetadata to cpaUsageMetadata in the message_start event of Claude
cleaned, _ = sjson.SetRawBytes(cleaned, "cpaUsageMetadata", []byte(usageMetadata.Raw))
cleaned, _ = sjson.DeleteBytes(cleaned, "usageMetadata")
changed = true
}
if usageMetadata = gjson.GetBytes(cleaned, "response.usageMetadata"); usageMetadata.Exists() {
// Rename usageMetadata to cpaUsageMetadata in the message_start event of Claude
cleaned, _ = sjson.SetRawBytes(cleaned, "response.cpaUsageMetadata", []byte(usageMetadata.Raw))
cleaned, _ = sjson.DeleteBytes(cleaned, "response.usageMetadata")
changed = true
}
return cleaned, changed
}
func hasUsageMetadata(jsonBytes []byte) bool {
if len(jsonBytes) == 0 || !gjson.ValidBytes(jsonBytes) {
return false
}
if gjson.GetBytes(jsonBytes, "usageMetadata").Exists() {
return true
}
if gjson.GetBytes(jsonBytes, "response.usageMetadata").Exists() {
return true
}
return false
}
func isStopChunkWithoutUsage(jsonBytes []byte) bool {
if len(jsonBytes) == 0 || !gjson.ValidBytes(jsonBytes) {
return false
}
finishReason := gjson.GetBytes(jsonBytes, "candidates.0.finishReason")
if !finishReason.Exists() {
finishReason = gjson.GetBytes(jsonBytes, "response.candidates.0.finishReason")
}
trimmed := strings.TrimSpace(finishReason.String())
if !finishReason.Exists() || trimmed == "" {
return false
}
return !hasUsageMetadata(jsonBytes)
}
func JSONPayload(line []byte) []byte {
return jsonPayload(line)
}
func jsonPayload(line []byte) []byte {
trimmed := bytes.TrimSpace(line)
if len(trimmed) == 0 {
return nil
}
if bytes.Equal(trimmed, []byte("[DONE]")) {
return nil
}
if bytes.HasPrefix(trimmed, []byte("event:")) {
return nil
}
if bytes.HasPrefix(trimmed, []byte("data:")) {
trimmed = bytes.TrimSpace(trimmed[len("data:"):])
}
if len(trimmed) == 0 || trimmed[0] != '{' {
return nil
}
return trimmed
}