From d687ee27772ac3541048c9ad60bee834aaac7356 Mon Sep 17 00:00:00 2001 From: Ravens2121 Date: Thu, 18 Dec 2025 04:38:22 +0800 Subject: [PATCH] feat(kiro): implement official reasoningContentEvent and improve metadat --- internal/runtime/executor/kiro_executor.go | 878 +++++++++--------- .../kiro/claude/kiro_claude_request.go | 15 +- .../kiro/openai/kiro_openai_request.go | 15 +- 3 files changed, 431 insertions(+), 477 deletions(-) diff --git a/internal/runtime/executor/kiro_executor.go b/internal/runtime/executor/kiro_executor.go index e346b744..1da7f25b 100644 --- a/internal/runtime/executor/kiro_executor.go +++ b/internal/runtime/executor/kiro_executor.go @@ -1293,17 +1293,66 @@ func (e *KiroExecutor) parseEventStream(body io.Reader) (string, []kiroclaude.Ki log.Debugf("kiro: parseEventStream found stopReason in messageStopEvent: %s", stopReason) } - case "messageMetadataEvent": - // Handle message metadata events which may contain token counts - if metadata, ok := event["messageMetadataEvent"].(map[string]interface{}); ok { + case "messageMetadataEvent", "metadataEvent": + // Handle message metadata events which contain token counts + // Official format: { tokenUsage: { outputTokens, totalTokens, uncachedInputTokens, cacheReadInputTokens, cacheWriteInputTokens, contextUsagePercentage } } + var metadata map[string]interface{} + if m, ok := event["messageMetadataEvent"].(map[string]interface{}); ok { + metadata = m + } else if m, ok := event["metadataEvent"].(map[string]interface{}); ok { + metadata = m + } else { + metadata = event // event itself might be the metadata + } + + // Check for nested tokenUsage object (official format) + if tokenUsage, ok := metadata["tokenUsage"].(map[string]interface{}); ok { + // outputTokens - precise output token count + if outputTokens, ok := tokenUsage["outputTokens"].(float64); ok { + usageInfo.OutputTokens = int64(outputTokens) + log.Infof("kiro: parseEventStream found precise outputTokens in tokenUsage: %d", usageInfo.OutputTokens) + } + // totalTokens - precise total token count + if totalTokens, ok := tokenUsage["totalTokens"].(float64); ok { + usageInfo.TotalTokens = int64(totalTokens) + log.Infof("kiro: parseEventStream found precise totalTokens in tokenUsage: %d", usageInfo.TotalTokens) + } + // uncachedInputTokens - input tokens not from cache + if uncachedInputTokens, ok := tokenUsage["uncachedInputTokens"].(float64); ok { + usageInfo.InputTokens = int64(uncachedInputTokens) + log.Infof("kiro: parseEventStream found uncachedInputTokens in tokenUsage: %d", usageInfo.InputTokens) + } + // cacheReadInputTokens - tokens read from cache + if cacheReadTokens, ok := tokenUsage["cacheReadInputTokens"].(float64); ok { + // Add to input tokens if we have uncached tokens, otherwise use as input + if usageInfo.InputTokens > 0 { + usageInfo.InputTokens += int64(cacheReadTokens) + } else { + usageInfo.InputTokens = int64(cacheReadTokens) + } + log.Debugf("kiro: parseEventStream found cacheReadInputTokens in tokenUsage: %d", int64(cacheReadTokens)) + } + // contextUsagePercentage - can be used as fallback for input token estimation + if ctxPct, ok := tokenUsage["contextUsagePercentage"].(float64); ok { + upstreamContextPercentage = ctxPct + log.Debugf("kiro: parseEventStream found contextUsagePercentage in tokenUsage: %.2f%%", ctxPct) + } + } + + // Fallback: check for direct fields in metadata (legacy format) + if usageInfo.InputTokens == 0 { if inputTokens, ok := metadata["inputTokens"].(float64); ok { usageInfo.InputTokens = int64(inputTokens) log.Debugf("kiro: parseEventStream found inputTokens in messageMetadataEvent: %d", usageInfo.InputTokens) } + } + if usageInfo.OutputTokens == 0 { if outputTokens, ok := metadata["outputTokens"].(float64); ok { usageInfo.OutputTokens = int64(outputTokens) log.Debugf("kiro: parseEventStream found outputTokens in messageMetadataEvent: %d", usageInfo.OutputTokens) } + } + if usageInfo.TotalTokens == 0 { if totalTokens, ok := metadata["totalTokens"].(float64); ok { usageInfo.TotalTokens = int64(totalTokens) log.Debugf("kiro: parseEventStream found totalTokens in messageMetadataEvent: %d", usageInfo.TotalTokens) @@ -1356,6 +1405,78 @@ func (e *KiroExecutor) parseEventStream(body io.Reader) (string, []kiroclaude.Ki usageInfo.InputTokens, usageInfo.OutputTokens) } + case "meteringEvent": + // Handle metering events from Kiro API (usage billing information) + // Official format: { unit: string, unitPlural: string, usage: number } + if metering, ok := event["meteringEvent"].(map[string]interface{}); ok { + unit := "" + if u, ok := metering["unit"].(string); ok { + unit = u + } + usageVal := 0.0 + if u, ok := metering["usage"].(float64); ok { + usageVal = u + } + log.Infof("kiro: parseEventStream received meteringEvent: usage=%.2f %s", usageVal, unit) + // Store metering info for potential billing/statistics purposes + // Note: This is separate from token counts - it's AWS billing units + } else { + // Try direct fields + unit := "" + if u, ok := event["unit"].(string); ok { + unit = u + } + usageVal := 0.0 + if u, ok := event["usage"].(float64); ok { + usageVal = u + } + if unit != "" || usageVal > 0 { + log.Infof("kiro: parseEventStream received meteringEvent (direct): usage=%.2f %s", usageVal, unit) + } + } + + case "error", "exception", "internalServerException", "invalidStateEvent": + // Handle error events from Kiro API stream + errMsg := "" + errType := eventType + + // Try to extract error message from various formats + if msg, ok := event["message"].(string); ok { + errMsg = msg + } else if errObj, ok := event[eventType].(map[string]interface{}); ok { + if msg, ok := errObj["message"].(string); ok { + errMsg = msg + } + if t, ok := errObj["type"].(string); ok { + errType = t + } + } else if errObj, ok := event["error"].(map[string]interface{}); ok { + if msg, ok := errObj["message"].(string); ok { + errMsg = msg + } + if t, ok := errObj["type"].(string); ok { + errType = t + } + } + + // Check for specific error reasons + if reason, ok := event["reason"].(string); ok { + errMsg = fmt.Sprintf("%s (reason: %s)", errMsg, reason) + } + + log.Errorf("kiro: parseEventStream received error event: type=%s, message=%s", errType, errMsg) + + // For invalidStateEvent, we may want to continue processing other events + if eventType == "invalidStateEvent" { + log.Warnf("kiro: invalidStateEvent received, continuing stream processing") + continue + } + + // For other errors, return the error + if errMsg != "" { + return "", nil, usageInfo, stopReason, fmt.Errorf("kiro API error (%s): %s", errType, errMsg) + } + default: // Check for contextUsagePercentage in any event if ctxPct, ok := event["contextUsagePercentage"].(float64); ok { @@ -1693,30 +1814,14 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out // IMPORTANT: This must persist across all TranslateStream calls var translatorParam any - // Thinking mode state tracking - based on amq2api implementation - // Tracks whether we're inside a block and handles partial tags - inThinkBlock := false - pendingStartTagChars := 0 // Number of chars that might be start of - pendingEndTagChars := 0 // Number of chars that might be start of - isThinkingBlockOpen := false // Track if thinking content block is open + // Thinking mode state tracking - tag-based parsing for tags in content + inThinkBlock := false // Whether we're currently inside a block + isThinkingBlockOpen := false // Track if thinking content block SSE event is open thinkingBlockIndex := -1 // Index of the thinking content block - var accumulatedThinkingContent strings.Builder // Accumulate thinking content for signature generation + var accumulatedThinkingContent strings.Builder // Accumulate thinking content for token counting - // Code block state tracking for heuristic thinking tag parsing - // When inside a markdown code block, tags should NOT be parsed - // This prevents false positives when the model outputs code examples containing these tags - inCodeBlock := false - codeFenceType := "" // Track which fence type opened the block ("```" or "~~~") - - // Inline code state tracking - when inside backticks, don't parse thinking tags - // This handles cases like `` being discussed in text - inInlineCode := false - - // Track if we've seen any non-whitespace content before a thinking tag - // Real thinking blocks from Kiro always start at the very beginning of the response - // If we see content before , subsequent tags are likely discussion text - hasSeenNonThinkingContent := false - thinkingBlockCompleted := false // Track if we've already completed a thinking block + // Buffer for handling partial tag matches at chunk boundaries + var pendingContent strings.Builder // Buffer content that might be part of a tag // Pre-calculate input tokens from request if possible // Kiro uses Claude format, so try Claude format first, then OpenAI format, then fallback @@ -1820,57 +1925,10 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out currentToolUse = nil } - // Flush any pending tag characters at EOF - // These are partial tag prefixes that were held back waiting for more data - // Since no more data is coming, output them as regular text - var pendingText string - if pendingStartTagChars > 0 { - pendingText = kirocommon.ThinkingStartTag[:pendingStartTagChars] - log.Debugf("kiro: flushing pending start tag chars at EOF: %q", pendingText) - pendingStartTagChars = 0 - } - if pendingEndTagChars > 0 { - pendingText += kirocommon.ThinkingEndTag[:pendingEndTagChars] - log.Debugf("kiro: flushing pending end tag chars at EOF: %q", pendingText) - pendingEndTagChars = 0 - } - - // Output pending text if any - if pendingText != "" { - // If we're in a thinking block, output as thinking content - if inThinkBlock && isThinkingBlockOpen { - thinkingEvent := kiroclaude.BuildClaudeThinkingDeltaEvent(pendingText, thinkingBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, thinkingEvent, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - // Accumulate thinking content for signature generation - accumulatedThinkingContent.WriteString(pendingText) - } else { - // Output as regular text - if !isTextBlockOpen { - contentBlockIndex++ - isTextBlockOpen = true - blockStart := kiroclaude.BuildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "") - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - - claudeEvent := kiroclaude.BuildClaudeStreamEvent(pendingText, contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - } + // DISABLED: Tag-based pending character flushing + // This code block was used for tag-based thinking detection which has been + // replaced by reasoningContentEvent handling. No pending tag chars to flush. + // Original code preserved in git history. break } @@ -1954,6 +2012,76 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out log.Debugf("kiro: streamToChannel found stopReason in messageStopEvent: %s", upstreamStopReason) } + case "meteringEvent": + // Handle metering events from Kiro API (usage billing information) + // Official format: { unit: string, unitPlural: string, usage: number } + if metering, ok := event["meteringEvent"].(map[string]interface{}); ok { + unit := "" + if u, ok := metering["unit"].(string); ok { + unit = u + } + usageVal := 0.0 + if u, ok := metering["usage"].(float64); ok { + usageVal = u + } + upstreamCreditUsage = usageVal + hasUpstreamUsage = true + log.Infof("kiro: streamToChannel received meteringEvent: usage=%.4f %s", usageVal, unit) + } else { + // Try direct fields (event is meteringEvent itself) + if unit, ok := event["unit"].(string); ok { + if usage, ok := event["usage"].(float64); ok { + upstreamCreditUsage = usage + hasUpstreamUsage = true + log.Infof("kiro: streamToChannel received meteringEvent (direct): usage=%.4f %s", usage, unit) + } + } + } + + case "error", "exception", "internalServerException": + // Handle error events from Kiro API stream + errMsg := "" + errType := eventType + + // Try to extract error message from various formats + if msg, ok := event["message"].(string); ok { + errMsg = msg + } else if errObj, ok := event[eventType].(map[string]interface{}); ok { + if msg, ok := errObj["message"].(string); ok { + errMsg = msg + } + if t, ok := errObj["type"].(string); ok { + errType = t + } + } else if errObj, ok := event["error"].(map[string]interface{}); ok { + if msg, ok := errObj["message"].(string); ok { + errMsg = msg + } + } + + log.Errorf("kiro: streamToChannel received error event: type=%s, message=%s", errType, errMsg) + + // Send error to the stream and exit + if errMsg != "" { + out <- cliproxyexecutor.StreamChunk{ + Err: fmt.Errorf("kiro API error (%s): %s", errType, errMsg), + } + return + } + + case "invalidStateEvent": + // Handle invalid state events - log and continue (non-fatal) + errMsg := "" + if msg, ok := event["message"].(string); ok { + errMsg = msg + } else if stateEvent, ok := event["invalidStateEvent"].(map[string]interface{}); ok { + if msg, ok := stateEvent["message"].(string); ok { + errMsg = msg + } + } + log.Warnf("kiro: streamToChannel received invalidStateEvent: %s, continuing", errMsg) + continue + default: // Check for upstream usage events from Kiro API // Format: {"unit":"credit","unitPlural":"credits","usage":1.458} @@ -2108,268 +2236,24 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out lastUsageUpdateLen = accumulatedContent.Len() lastUsageUpdateTime = time.Now() - } - - // Process content with thinking tag detection - based on amq2api implementation - // This handles and tags that may span across chunks - remaining := contentDelta - - // If we have pending start tag chars from previous chunk, prepend them - if pendingStartTagChars > 0 { - remaining = kirocommon.ThinkingStartTag[:pendingStartTagChars] + remaining - pendingStartTagChars = 0 - } - - // If we have pending end tag chars from previous chunk, prepend them - if pendingEndTagChars > 0 { - remaining = kirocommon.ThinkingEndTag[:pendingEndTagChars] + remaining - pendingEndTagChars = 0 - } - - for len(remaining) > 0 { - // CRITICAL FIX: Only parse tags when thinking mode was enabled in the request. - // When thinking is NOT enabled, tags in responses should be treated as - // regular text content, not as thinking blocks. This prevents normal text content - // from being incorrectly parsed as thinking when the model outputs tags - // without the user requesting thinking mode. - if !thinkingEnabled { - // Thinking not enabled - emit all content as regular text without parsing tags - if remaining != "" { - if !isTextBlockOpen { - contentBlockIndex++ - isTextBlockOpen = true - blockStart := kiroclaude.BuildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "") - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - - claudeEvent := kiroclaude.BuildClaudeStreamEvent(remaining, contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - break // Exit the for loop - all content processed as text } - // HEURISTIC FIX: Track code block and inline code state to avoid parsing tags - // inside code contexts. When the model outputs code examples containing these tags, - // they should be treated as text. - if !inThinkBlock { - // Check for inline code backticks first (higher priority than code fences) - // This handles cases like `` being discussed in text - backtickIdx := strings.Index(remaining, kirocommon.InlineCodeMarker) - thinkingIdx := strings.Index(remaining, kirocommon.ThinkingStartTag) - - // If backtick comes before thinking tag, handle inline code - if backtickIdx >= 0 && (thinkingIdx < 0 || backtickIdx < thinkingIdx) { - if inInlineCode { - // Closing backtick - emit content up to and including backtick, exit inline code - textToEmit := remaining[:backtickIdx+1] - if textToEmit != "" { - if !isTextBlockOpen { - contentBlockIndex++ - isTextBlockOpen = true - blockStart := kiroclaude.BuildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "") - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - claudeEvent := kiroclaude.BuildClaudeStreamEvent(textToEmit, contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - remaining = remaining[backtickIdx+1:] - inInlineCode = false - continue - } else { - // Opening backtick - emit content before backtick, enter inline code - textToEmit := remaining[:backtickIdx+1] - if textToEmit != "" { - if !isTextBlockOpen { - contentBlockIndex++ - isTextBlockOpen = true - blockStart := kiroclaude.BuildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "") - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - claudeEvent := kiroclaude.BuildClaudeStreamEvent(textToEmit, contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - remaining = remaining[backtickIdx+1:] - inInlineCode = true - continue - } - } - - // If inside inline code, emit all content as text (don't parse thinking tags) - if inInlineCode { - if remaining != "" { - if !isTextBlockOpen { - contentBlockIndex++ - isTextBlockOpen = true - blockStart := kiroclaude.BuildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "") - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - claudeEvent := kiroclaude.BuildClaudeStreamEvent(remaining, contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - break // Exit loop - remaining content is inside inline code - } - - // Check for code fence markers (``` or ~~~) to toggle code block state - fenceIdx := strings.Index(remaining, kirocommon.CodeFenceMarker) - altFenceIdx := strings.Index(remaining, kirocommon.AltCodeFenceMarker) - - // Find the earliest fence marker - earliestFenceIdx := -1 - earliestFenceType := "" - if fenceIdx >= 0 && (altFenceIdx < 0 || fenceIdx < altFenceIdx) { - earliestFenceIdx = fenceIdx - earliestFenceType = kirocommon.CodeFenceMarker - } else if altFenceIdx >= 0 { - earliestFenceIdx = altFenceIdx - earliestFenceType = kirocommon.AltCodeFenceMarker - } - - if earliestFenceIdx >= 0 { - // Check if this fence comes before any thinking tag - thinkingIdx := strings.Index(remaining, kirocommon.ThinkingStartTag) - if inCodeBlock { - // Inside code block - check if this fence closes it - if earliestFenceType == codeFenceType { - // This fence closes the code block - // Emit content up to and including the fence as text - textToEmit := remaining[:earliestFenceIdx+len(earliestFenceType)] - if textToEmit != "" { - if !isTextBlockOpen { - contentBlockIndex++ - isTextBlockOpen = true - blockStart := kiroclaude.BuildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "") - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - claudeEvent := kiroclaude.BuildClaudeStreamEvent(textToEmit, contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - remaining = remaining[earliestFenceIdx+len(earliestFenceType):] - inCodeBlock = false - codeFenceType = "" - log.Debugf("kiro: exited code block") - continue - } - } else if thinkingIdx < 0 || earliestFenceIdx < thinkingIdx { - // Not in code block, and fence comes before thinking tag (or no thinking tag) - // Emit content up to and including the fence as text, then enter code block - textToEmit := remaining[:earliestFenceIdx+len(earliestFenceType)] - if textToEmit != "" { - if !isTextBlockOpen { - contentBlockIndex++ - isTextBlockOpen = true - blockStart := kiroclaude.BuildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "") - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - claudeEvent := kiroclaude.BuildClaudeStreamEvent(textToEmit, contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - remaining = remaining[earliestFenceIdx+len(earliestFenceType):] - inCodeBlock = true - codeFenceType = earliestFenceType - log.Debugf("kiro: entered code block with fence: %s", earliestFenceType) - continue - } - } - - // If inside code block, emit all content as text (don't parse thinking tags) - if inCodeBlock { - if remaining != "" { - if !isTextBlockOpen { - contentBlockIndex++ - isTextBlockOpen = true - blockStart := kiroclaude.BuildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "") - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - claudeEvent := kiroclaude.BuildClaudeStreamEvent(remaining, contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - break // Exit loop - all remaining content is inside code block - } - } + // TAG-BASED THINKING PARSING: Parse tags from content + // Combine pending content with new content for processing + pendingContent.WriteString(contentDelta) + processContent := pendingContent.String() + pendingContent.Reset() + // Process content looking for thinking tags + for len(processContent) > 0 { if inThinkBlock { - // Inside thinking block - look for end tag - // CRITICAL FIX: Skip tags that are not the real end tag - // This prevents false positives when thinking content discusses these tags - // Pass current code block/inline code state for accurate detection - endIdx := findRealThinkingEndTag(remaining, inCodeBlock, inInlineCode) - + // We're inside a thinking block, look for + endIdx := strings.Index(processContent, kirocommon.ThinkingEndTag) if endIdx >= 0 { - // Found end tag - emit any content before end tag, then close block - thinkContent := remaining[:endIdx] - if thinkContent != "" { - // TRUE STREAMING: Emit thinking content immediately - // Start thinking block if not open + // Found end tag - emit thinking content before the tag + thinkingText := processContent[:endIdx] + if thinkingText != "" { + // Ensure thinking block is open if !isThinkingBlockOpen { contentBlockIndex++ thinkingBlockIndex = contentBlockIndex @@ -2382,22 +2266,16 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out } } } - - // Send thinking delta immediately - thinkingEvent := kiroclaude.BuildClaudeThinkingDeltaEvent(thinkContent, thinkingBlockIndex) + // Send thinking delta + thinkingEvent := kiroclaude.BuildClaudeThinkingDeltaEvent(thinkingText, thinkingBlockIndex) sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, thinkingEvent, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} } } - // Accumulate thinking content for signature generation - accumulatedThinkingContent.WriteString(thinkContent) + accumulatedThinkingContent.WriteString(thinkingText) } - - // Note: Partial tag handling is done via pendingEndTagChars - // When the next chunk arrives, the partial tag will be reconstructed - // Close thinking block if isThinkingBlockOpen { blockStop := kiroclaude.BuildClaudeThinkingBlockStopEvent(thinkingBlockIndex) @@ -2408,84 +2286,68 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out } } isThinkingBlockOpen = false - accumulatedThinkingContent.Reset() // Reset for potential next thinking block } - inThinkBlock = false - thinkingBlockCompleted = true // Mark that we've completed a thinking block - remaining = remaining[endIdx+len(kirocommon.ThinkingEndTag):] - log.Debugf("kiro: exited thinking block, subsequent tags will be treated as text") + processContent = processContent[endIdx+len(kirocommon.ThinkingEndTag):] + log.Debugf("kiro: closed thinking block, remaining content: %d chars", len(processContent)) } else { - // No end tag found - TRUE STREAMING: emit content immediately - // Only save potential partial tag length for next iteration - pendingEnd := kiroclaude.PendingTagSuffix(remaining, kirocommon.ThinkingEndTag) - - // Calculate content to emit immediately (excluding potential partial tag) - var contentToEmit string - if pendingEnd > 0 { - contentToEmit = remaining[:len(remaining)-pendingEnd] - // Save partial tag length for next iteration (will be reconstructed from thinkingEndTag) - pendingEndTagChars = pendingEnd - } else { - contentToEmit = remaining + // No end tag found - check for partial match at end + partialMatch := false + for i := 1; i < len(kirocommon.ThinkingEndTag) && i <= len(processContent); i++ { + if strings.HasSuffix(processContent, kirocommon.ThinkingEndTag[:i]) { + // Possible partial tag at end, buffer it + pendingContent.WriteString(processContent[len(processContent)-i:]) + processContent = processContent[:len(processContent)-i] + partialMatch = true + break + } } - - // TRUE STREAMING: Emit thinking content immediately - if contentToEmit != "" { - // Start thinking block if not open - if !isThinkingBlockOpen { - contentBlockIndex++ - thinkingBlockIndex = contentBlockIndex - isThinkingBlockOpen = true - blockStart := kiroclaude.BuildClaudeContentBlockStartEvent(thinkingBlockIndex, "thinking", "", "") - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) + if !partialMatch || len(processContent) > 0 { + // Emit all as thinking content + if processContent != "" { + if !isThinkingBlockOpen { + contentBlockIndex++ + thinkingBlockIndex = contentBlockIndex + isThinkingBlockOpen = true + blockStart := kiroclaude.BuildClaudeContentBlockStartEvent(thinkingBlockIndex, "thinking", "", "") + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + } + thinkingEvent := kiroclaude.BuildClaudeThinkingDeltaEvent(processContent, thinkingBlockIndex) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, thinkingEvent, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} } } + accumulatedThinkingContent.WriteString(processContent) } - - // Send thinking delta immediately - TRUE STREAMING! - thinkingEvent := kiroclaude.BuildClaudeThinkingDeltaEvent(contentToEmit, thinkingBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, thinkingEvent, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - // Accumulate thinking content for signature generation - accumulatedThinkingContent.WriteString(contentToEmit) } - - remaining = "" + processContent = "" } } else { - // Outside thinking block - look for start tag - // CRITICAL FIX: Only parse tags at the very beginning of the response - // or if we haven't completed a thinking block yet. - // After a thinking block is completed, subsequent tags are likely - // discussion text (e.g., "Kiro returns `` tags") and should NOT be parsed. - startIdx := -1 - if !thinkingBlockCompleted && !hasSeenNonThinkingContent { - startIdx = strings.Index(remaining, kirocommon.ThinkingStartTag) - // If there's non-whitespace content before the tag, it's not a real thinking block - if startIdx > 0 { - textBefore := remaining[:startIdx] - if strings.TrimSpace(textBefore) != "" { - // There's real content before the tag - this is discussion text, not thinking - hasSeenNonThinkingContent = true - startIdx = -1 - log.Debugf("kiro: found tag after non-whitespace content, treating as text") - } - } - } + // Not in thinking block, look for + startIdx := strings.Index(processContent, kirocommon.ThinkingStartTag) if startIdx >= 0 { - // Found start tag - emit text before it and switch to thinking mode - textBefore := remaining[:startIdx] + // Found start tag - emit text content before the tag + textBefore := processContent[:startIdx] if textBefore != "" { - // Only whitespace before thinking tag is allowed - // Start text content block if needed + // Close thinking block if open + if isThinkingBlockOpen { + blockStop := kiroclaude.BuildClaudeThinkingBlockStopEvent(thinkingBlockIndex) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + isThinkingBlockOpen = false + } + // Ensure text block is open if !isTextBlockOpen { contentBlockIndex++ isTextBlockOpen = true @@ -2497,7 +2359,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out } } } - + // Send text delta claudeEvent := kiroclaude.BuildClaudeStreamEvent(textBefore, contentBlockIndex) sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) for _, chunk := range sseData { @@ -2506,8 +2368,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out } } } - - // Close text block before starting thinking block + // Close text block before entering thinking if isTextBlockOpen { blockStop := kiroclaude.BuildClaudeContentBlockStopEvent(contentBlockIndex) sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) @@ -2518,26 +2379,24 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out } isTextBlockOpen = false } - inThinkBlock = true - remaining = remaining[startIdx+len(kirocommon.ThinkingStartTag):] + processContent = processContent[startIdx+len(kirocommon.ThinkingStartTag):] log.Debugf("kiro: entered thinking block") } else { - // No start tag found - check for partial start tag at buffer end - // Only check for partial tags if we haven't completed a thinking block yet - pendingStart := 0 - if !thinkingBlockCompleted && !hasSeenNonThinkingContent { - pendingStart = kiroclaude.PendingTagSuffix(remaining, kirocommon.ThinkingStartTag) + // No start tag found - check for partial match at end + partialMatch := false + for i := 1; i < len(kirocommon.ThinkingStartTag) && i <= len(processContent); i++ { + if strings.HasSuffix(processContent, kirocommon.ThinkingStartTag[:i]) { + // Possible partial tag at end, buffer it + pendingContent.WriteString(processContent[len(processContent)-i:]) + processContent = processContent[:len(processContent)-i] + partialMatch = true + break + } } - if pendingStart > 0 { - // Emit text except potential partial tag - textToEmit := remaining[:len(remaining)-pendingStart] - if textToEmit != "" { - // Mark that we've seen non-thinking content - if strings.TrimSpace(textToEmit) != "" { - hasSeenNonThinkingContent = true - } - // Start text content block if needed + if !partialMatch || len(processContent) > 0 { + // Emit all as text content + if processContent != "" { if !isTextBlockOpen { contentBlockIndex++ isTextBlockOpen = true @@ -2549,8 +2408,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out } } } - - claudeEvent := kiroclaude.BuildClaudeStreamEvent(textToEmit, contentBlockIndex) + claudeEvent := kiroclaude.BuildClaudeStreamEvent(processContent, contentBlockIndex) sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) for _, chunk := range sseData { if chunk != "" { @@ -2558,41 +2416,11 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out } } } - pendingStartTagChars = pendingStart - remaining = "" - } else { - // No partial tag - emit all as text - if remaining != "" { - // Mark that we've seen non-thinking content - if strings.TrimSpace(remaining) != "" { - hasSeenNonThinkingContent = true - } - // Start text content block if needed - if !isTextBlockOpen { - contentBlockIndex++ - isTextBlockOpen = true - blockStart := kiroclaude.BuildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "") - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - - claudeEvent := kiroclaude.BuildClaudeStreamEvent(remaining, contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } - } - remaining = "" } + processContent = "" } } - } + } } // Handle tool uses in response (with deduplication) @@ -2658,6 +2486,80 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out } } + case "reasoningContentEvent": + // Handle official reasoningContentEvent from Kiro API + // This replaces tag-based thinking detection with the proper event type + // Official format: { text: string, signature?: string, redactedContent?: base64 } + var thinkingText string + var signature string + + if re, ok := event["reasoningContentEvent"].(map[string]interface{}); ok { + if text, ok := re["text"].(string); ok { + thinkingText = text + } + if sig, ok := re["signature"].(string); ok { + signature = sig + if len(sig) > 20 { + log.Debugf("kiro: reasoningContentEvent has signature: %s...", sig[:20]) + } else { + log.Debugf("kiro: reasoningContentEvent has signature: %s", sig) + } + } + } else { + // Try direct fields + if text, ok := event["text"].(string); ok { + thinkingText = text + } + if sig, ok := event["signature"].(string); ok { + signature = sig + } + } + + if thinkingText != "" { + // Close text block if open before starting thinking block + if isTextBlockOpen && contentBlockIndex >= 0 { + blockStop := kiroclaude.BuildClaudeContentBlockStopEvent(contentBlockIndex) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + isTextBlockOpen = false + } + + // Start thinking block if not already open + if !isThinkingBlockOpen { + contentBlockIndex++ + thinkingBlockIndex = contentBlockIndex + isThinkingBlockOpen = true + blockStart := kiroclaude.BuildClaudeContentBlockStartEvent(thinkingBlockIndex, "thinking", "", "") + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + } + + // Send thinking content + thinkingEvent := kiroclaude.BuildClaudeThinkingDeltaEvent(thinkingText, thinkingBlockIndex) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, thinkingEvent, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + + // Accumulate for token counting + accumulatedThinkingContent.WriteString(thinkingText) + log.Debugf("kiro: received reasoningContentEvent, text length: %d, has signature: %v", len(thinkingText), signature != "") + } + + // Note: We don't close the thinking block here - it will be closed when we see + // the next assistantResponseEvent or at the end of the stream + _ = signature // Signature can be used for verification if needed + case "toolUseEvent": // Handle dedicated tool use events with input buffering completedToolUses, newState := kiroclaude.ProcessToolUseEvent(event, currentToolUse, processedIDs) @@ -2721,17 +2623,71 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out totalUsage.OutputTokens = int64(outputTokens) } - case "messageMetadataEvent": - // Handle message metadata events which may contain token counts - if metadata, ok := event["messageMetadataEvent"].(map[string]interface{}); ok { + case "messageMetadataEvent", "metadataEvent": + // Handle message metadata events which contain token counts + // Official format: { tokenUsage: { outputTokens, totalTokens, uncachedInputTokens, cacheReadInputTokens, cacheWriteInputTokens, contextUsagePercentage } } + var metadata map[string]interface{} + if m, ok := event["messageMetadataEvent"].(map[string]interface{}); ok { + metadata = m + } else if m, ok := event["metadataEvent"].(map[string]interface{}); ok { + metadata = m + } else { + metadata = event // event itself might be the metadata + } + + // Check for nested tokenUsage object (official format) + if tokenUsage, ok := metadata["tokenUsage"].(map[string]interface{}); ok { + // outputTokens - precise output token count + if outputTokens, ok := tokenUsage["outputTokens"].(float64); ok { + totalUsage.OutputTokens = int64(outputTokens) + hasUpstreamUsage = true + log.Infof("kiro: streamToChannel found precise outputTokens in tokenUsage: %d", totalUsage.OutputTokens) + } + // totalTokens - precise total token count + if totalTokens, ok := tokenUsage["totalTokens"].(float64); ok { + totalUsage.TotalTokens = int64(totalTokens) + log.Infof("kiro: streamToChannel found precise totalTokens in tokenUsage: %d", totalUsage.TotalTokens) + } + // uncachedInputTokens - input tokens not from cache + if uncachedInputTokens, ok := tokenUsage["uncachedInputTokens"].(float64); ok { + totalUsage.InputTokens = int64(uncachedInputTokens) + hasUpstreamUsage = true + log.Infof("kiro: streamToChannel found uncachedInputTokens in tokenUsage: %d", totalUsage.InputTokens) + } + // cacheReadInputTokens - tokens read from cache + if cacheReadTokens, ok := tokenUsage["cacheReadInputTokens"].(float64); ok { + // Add to input tokens if we have uncached tokens, otherwise use as input + if totalUsage.InputTokens > 0 { + totalUsage.InputTokens += int64(cacheReadTokens) + } else { + totalUsage.InputTokens = int64(cacheReadTokens) + } + hasUpstreamUsage = true + log.Debugf("kiro: streamToChannel found cacheReadInputTokens in tokenUsage: %d", int64(cacheReadTokens)) + } + // contextUsagePercentage - can be used as fallback for input token estimation + if ctxPct, ok := tokenUsage["contextUsagePercentage"].(float64); ok { + upstreamContextPercentage = ctxPct + log.Debugf("kiro: streamToChannel found contextUsagePercentage in tokenUsage: %.2f%%", ctxPct) + } + } + + // Fallback: check for direct fields in metadata (legacy format) + if totalUsage.InputTokens == 0 { if inputTokens, ok := metadata["inputTokens"].(float64); ok { totalUsage.InputTokens = int64(inputTokens) + hasUpstreamUsage = true log.Debugf("kiro: streamToChannel found inputTokens in messageMetadataEvent: %d", totalUsage.InputTokens) } + } + if totalUsage.OutputTokens == 0 { if outputTokens, ok := metadata["outputTokens"].(float64); ok { totalUsage.OutputTokens = int64(outputTokens) + hasUpstreamUsage = true log.Debugf("kiro: streamToChannel found outputTokens in messageMetadataEvent: %d", totalUsage.OutputTokens) } + } + if totalUsage.TotalTokens == 0 { if totalTokens, ok := metadata["totalTokens"].(float64); ok { totalUsage.TotalTokens = int64(totalTokens) log.Debugf("kiro: streamToChannel found totalTokens in messageMetadataEvent: %d", totalUsage.TotalTokens) diff --git a/internal/translator/kiro/claude/kiro_claude_request.go b/internal/translator/kiro/claude/kiro_claude_request.go index e3e333d1..402591e7 100644 --- a/internal/translator/kiro/claude/kiro_claude_request.go +++ b/internal/translator/kiro/claude/kiro_claude_request.go @@ -222,20 +222,19 @@ func BuildKiroPayload(claudeBody []byte, modelID, profileArn, origin string, isA kiroTools := convertClaudeToolsToKiro(tools) // Thinking mode implementation: - // Kiro API doesn't accept max_tokens for thinking. Instead, thinking mode is enabled - // by injecting and tags into the system prompt. - // We use a fixed max_thinking_length value since Kiro handles the actual budget internally. + // Kiro API supports official thinking/reasoning mode via tag. + // When set to "enabled", Kiro returns reasoning content as official reasoningContentEvent + // rather than inline tags in assistantResponseEvent. + // We use a high max_thinking_length to allow extensive reasoning. if thinkingEnabled { - thinkingHint := `interleaved -200000 - -IMPORTANT: You MUST use ... tags to show your reasoning process before providing your final response. Think step by step inside the thinking tags.` + thinkingHint := `enabled +200000` if systemPrompt != "" { systemPrompt = thinkingHint + "\n\n" + systemPrompt } else { systemPrompt = thinkingHint } - log.Infof("kiro: injected thinking prompt, has_tools: %v", len(kiroTools) > 0) + log.Infof("kiro: injected thinking prompt (official mode), has_tools: %v", len(kiroTools) > 0) } // Process messages and build history diff --git a/internal/translator/kiro/openai/kiro_openai_request.go b/internal/translator/kiro/openai/kiro_openai_request.go index e4f3e767..f58b50cf 100644 --- a/internal/translator/kiro/openai/kiro_openai_request.go +++ b/internal/translator/kiro/openai/kiro_openai_request.go @@ -231,20 +231,19 @@ func BuildKiroPayloadFromOpenAI(openaiBody []byte, modelID, profileArn, origin s kiroTools := convertOpenAIToolsToKiro(tools) // Thinking mode implementation: - // Kiro API doesn't accept max_tokens for thinking. Instead, thinking mode is enabled - // by injecting and tags into the system prompt. - // We use a fixed max_thinking_length value since Kiro handles the actual budget internally. + // Kiro API supports official thinking/reasoning mode via tag. + // When set to "enabled", Kiro returns reasoning content as official reasoningContentEvent + // rather than inline tags in assistantResponseEvent. + // We use a high max_thinking_length to allow extensive reasoning. if thinkingEnabled { - thinkingHint := `interleaved -200000 - -IMPORTANT: You MUST use ... tags to show your reasoning process before providing your final response. Think step by step inside the thinking tags.` + thinkingHint := `enabled +200000` if systemPrompt != "" { systemPrompt = thinkingHint + "\n\n" + systemPrompt } else { systemPrompt = thinkingHint } - log.Debugf("kiro-openai: injected thinking prompt") + log.Debugf("kiro-openai: injected thinking prompt (official mode)") } // Process messages and build history