feat(translator): add usage token details for cache input/output aggregation

- Introduced `claudeResponsesUsageTokens` to manage detailed token statistics, including input, output, cache read, and cache creation tokens.
- Updated aggregation logic to include cached tokens in total usage calculations.
- Refactored usage processing to simplify token merging and ensure consistent handling across streaming and non-streaming responses.
- Added unit tests (`TestConvertClaudeResponseToOpenAIResponses_ReportsCacheTokens`) to verify correct cache token inclusion in usage metrics.

Closes: #3807
This commit is contained in:
Luis Pater
2026-06-12 21:03:39 +08:00
parent 5633c93622
commit 9dbf4cd07e
2 changed files with 111 additions and 43 deletions

View File

@@ -39,13 +39,46 @@ type claudeToResponsesState struct {
ReasoningPartAdded bool
ReasoningIndex int
// usage aggregation
InputTokens int64
OutputTokens int64
UsageSeen bool
Usage claudeResponsesUsageTokens
}
type claudeResponsesUsageTokens struct {
InputTokens int64
OutputTokens int64
CacheCreationInputTokens int64
CacheReadInputTokens int64
HasUsage bool
}
var dataTag = []byte("data:")
func (u *claudeResponsesUsageTokens) Merge(usage gjson.Result) {
if !usage.Exists() {
return
}
u.HasUsage = true
if inputTokens := usage.Get("input_tokens"); inputTokens.Exists() {
u.InputTokens = inputTokens.Int()
}
if outputTokens := usage.Get("output_tokens"); outputTokens.Exists() {
u.OutputTokens = outputTokens.Int()
}
if cacheCreationInputTokens := usage.Get("cache_creation_input_tokens"); cacheCreationInputTokens.Exists() {
u.CacheCreationInputTokens = cacheCreationInputTokens.Int()
}
if cacheReadInputTokens := usage.Get("cache_read_input_tokens"); cacheReadInputTokens.Exists() {
u.CacheReadInputTokens = cacheReadInputTokens.Int()
}
}
func (u claudeResponsesUsageTokens) OpenAIResponsesUsage() (inputTokens, outputTokens, totalTokens, cachedTokens int64) {
cachedTokens = u.CacheReadInputTokens
inputTokens = u.InputTokens + u.CacheCreationInputTokens + cachedTokens
outputTokens = u.OutputTokens
totalTokens = inputTokens + outputTokens
return inputTokens, outputTokens, totalTokens, cachedTokens
}
func pickRequestJSON(originalRequestRawJSON, requestRawJSON []byte) []byte {
if len(originalRequestRawJSON) > 0 && gjson.ValidBytes(originalRequestRawJSON) {
return originalRequestRawJSON
@@ -153,19 +186,8 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin
st.FuncArgsBuf = make(map[int]*strings.Builder)
st.FuncNames = make(map[int]string)
st.FuncCallIDs = make(map[int]string)
st.InputTokens = 0
st.OutputTokens = 0
st.UsageSeen = false
if usage := msg.Get("usage"); usage.Exists() {
if v := usage.Get("input_tokens"); v.Exists() {
st.InputTokens = v.Int()
st.UsageSeen = true
}
if v := usage.Get("output_tokens"); v.Exists() {
st.OutputTokens = v.Int()
st.UsageSeen = true
}
}
st.Usage = claudeResponsesUsageTokens{}
st.Usage.Merge(msg.Get("usage"))
// response.created
created := []byte(`{"type":"response.created","sequence_number":0,"response":{"id":"","object":"response","created_at":0,"status":"in_progress","background":false,"error":null,"output":[]}}`)
created, _ = sjson.SetBytes(created, "sequence_number", nextSeq())
@@ -361,16 +383,7 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin
}
return noSSEOutput(out)
case "message_delta":
if usage := root.Get("usage"); usage.Exists() {
if v := usage.Get("output_tokens"); v.Exists() {
st.OutputTokens = v.Int()
st.UsageSeen = true
}
if v := usage.Get("input_tokens"); v.Exists() {
st.InputTokens = v.Int()
st.UsageSeen = true
}
}
st.Usage.Merge(root.Get("usage"))
return [][]byte{}
case "message_stop":
out = append(out, st.finalizeAssistantMessage(nextSeq)...)
@@ -511,17 +524,17 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin
if st.ReasoningBuf.Len() > 0 {
reasoningTokens = int64(st.ReasoningBuf.Len() / 4)
}
usagePresent := st.UsageSeen || reasoningTokens > 0
usagePresent := st.Usage.HasUsage || reasoningTokens > 0
if usagePresent {
completed, _ = sjson.SetBytes(completed, "response.usage.input_tokens", st.InputTokens)
completed, _ = sjson.SetBytes(completed, "response.usage.input_tokens_details.cached_tokens", 0)
completed, _ = sjson.SetBytes(completed, "response.usage.output_tokens", st.OutputTokens)
inputTokens, outputTokens, totalTokens, cachedTokens := st.Usage.OpenAIResponsesUsage()
completed, _ = sjson.SetBytes(completed, "response.usage.input_tokens", inputTokens)
completed, _ = sjson.SetBytes(completed, "response.usage.input_tokens_details.cached_tokens", cachedTokens)
completed, _ = sjson.SetBytes(completed, "response.usage.output_tokens", outputTokens)
if reasoningTokens > 0 {
completed, _ = sjson.SetBytes(completed, "response.usage.output_tokens_details.reasoning_tokens", reasoningTokens)
}
total := st.InputTokens + st.OutputTokens
if total > 0 || st.UsageSeen {
completed, _ = sjson.SetBytes(completed, "response.usage.total_tokens", total)
if totalTokens > 0 || st.Usage.HasUsage {
completed, _ = sjson.SetBytes(completed, "response.usage.total_tokens", totalTokens)
}
}
out = append(out, emitEvent("response.completed", completed))
@@ -568,8 +581,7 @@ func ConvertClaudeResponseToOpenAIResponsesNonStream(_ context.Context, _ string
reasoningItemID string
reasoningSig string
annotations []any
inputTokens int64
outputTokens int64
usageTokens claudeResponsesUsageTokens
)
// Per-index tool call aggregation
@@ -590,9 +602,7 @@ func ConvertClaudeResponseToOpenAIResponsesNonStream(_ context.Context, _ string
if msg := root.Get("message"); msg.Exists() {
responseID = msg.Get("id").String()
createdAt = time.Now().Unix()
if usage := msg.Get("usage"); usage.Exists() {
inputTokens = usage.Get("input_tokens").Int()
}
usageTokens.Merge(msg.Get("usage"))
}
case "content_block_start":
@@ -665,9 +675,7 @@ func ConvertClaudeResponseToOpenAIResponsesNonStream(_ context.Context, _ string
_ = root
case "message_delta":
if usage := root.Get("usage"); usage.Exists() {
outputTokens = usage.Get("output_tokens").Int()
}
usageTokens.Merge(root.Get("usage"))
}
}
@@ -795,10 +803,11 @@ func ConvertClaudeResponseToOpenAIResponsesNonStream(_ context.Context, _ string
}
// Usage
total := inputTokens + outputTokens
inputTokens, outputTokens, totalTokens, cachedTokens := usageTokens.OpenAIResponsesUsage()
out, _ = sjson.SetBytes(out, "usage.input_tokens", inputTokens)
out, _ = sjson.SetBytes(out, "usage.input_tokens_details.cached_tokens", cachedTokens)
out, _ = sjson.SetBytes(out, "usage.output_tokens", outputTokens)
out, _ = sjson.SetBytes(out, "usage.total_tokens", total)
out, _ = sjson.SetBytes(out, "usage.total_tokens", totalTokens)
if reasoningBuf.Len() > 0 {
// Rough estimate similar to chat completions
reasoningTokens := int64(len(reasoningBuf.String()) / 4)

View File

@@ -166,6 +166,41 @@ func TestConvertClaudeResponseToOpenAIResponses_AggregatesTextBlocksUntilMessage
}
}
func TestConvertClaudeResponseToOpenAIResponses_ReportsCacheTokens(t *testing.T) {
chunks := [][]byte{
[]byte(`data: {"type":"message_start","message":{"id":"msg_123","usage":{"input_tokens":13,"output_tokens":1,"cache_read_input_tokens":100,"cache_creation_input_tokens":7}}}`),
[]byte(`data: {"type":"message_delta","usage":{"output_tokens":4,"cache_read_input_tokens":22000,"cache_creation_input_tokens":31}}`),
[]byte(`data: {"type":"message_stop"}`),
}
var param any
var completed gjson.Result
for _, chunk := range chunks {
for _, output := range ConvertClaudeResponseToOpenAIResponses(context.Background(), "claude-test", nil, nil, chunk, &param) {
event, data := parseClaudeResponsesSSEEvent(t, output)
if event == "response.completed" {
completed = data
}
}
}
if !completed.Exists() {
t.Fatal("expected response.completed event")
}
if got := completed.Get("response.usage.input_tokens").Int(); got != 22044 {
t.Fatalf("response usage input_tokens = %d, want %d", got, 22044)
}
if got := completed.Get("response.usage.input_tokens_details.cached_tokens").Int(); got != 22000 {
t.Fatalf("response usage cached_tokens = %d, want %d", got, 22000)
}
if got := completed.Get("response.usage.output_tokens").Int(); got != 4 {
t.Fatalf("response usage output_tokens = %d, want %d", got, 4)
}
if got := completed.Get("response.usage.total_tokens").Int(); got != 22048 {
t.Fatalf("response usage total_tokens = %d, want %d", got, 22048)
}
}
func TestConvertClaudeResponseToOpenAIResponsesNonStream_ThinkingIncludesSignature(t *testing.T) {
signature := "claude_sig_nonstream"
raw := []byte(strings.Join([]string{
@@ -187,3 +222,27 @@ func TestConvertClaudeResponseToOpenAIResponsesNonStream_ThinkingIncludesSignatu
t.Fatalf("non-stream reasoning summary text = %q", got)
}
}
func TestConvertClaudeResponseToOpenAIResponsesNonStream_ReportsCacheTokens(t *testing.T) {
raw := []byte(strings.Join([]string{
`data: {"type":"message_start","message":{"id":"msg_nonstream","usage":{"input_tokens":13,"output_tokens":1,"cache_read_input_tokens":22000,"cache_creation_input_tokens":31}}}`,
`data: {"type":"message_delta","usage":{"output_tokens":4}}`,
`data: {"type":"message_stop"}`,
}, "\n"))
out := ConvertClaudeResponseToOpenAIResponsesNonStream(context.Background(), "claude-test", nil, nil, raw, nil)
root := gjson.ParseBytes(out)
if got := root.Get("usage.input_tokens").Int(); got != 22044 {
t.Fatalf("non-stream usage input_tokens = %d, want %d", got, 22044)
}
if got := root.Get("usage.input_tokens_details.cached_tokens").Int(); got != 22000 {
t.Fatalf("non-stream usage cached_tokens = %d, want %d", got, 22000)
}
if got := root.Get("usage.output_tokens").Int(); got != 4 {
t.Fatalf("non-stream usage output_tokens = %d, want %d", got, 4)
}
if got := root.Get("usage.total_tokens").Int(); got != 22048 {
t.Fatalf("non-stream usage total_tokens = %d, want %d", got, 22048)
}
}