diff --git a/internal/cache/codex_reasoning_replay_cache.go b/internal/cache/codex_reasoning_replay_cache.go new file mode 100644 index 000000000..820f7f1d1 --- /dev/null +++ b/internal/cache/codex_reasoning_replay_cache.go @@ -0,0 +1,253 @@ +package cache + +import ( + "sort" + "strings" + "sync" + "time" + + "github.com/router-for-me/CLIProxyAPI/v7/internal/signature" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" +) + +const ( + // CodexReasoningReplayCacheTTL limits how long encrypted reasoning replay + // items stay in process memory. + CodexReasoningReplayCacheTTL = 1 * time.Hour + + // CodexReasoningReplayCacheMaxEntries bounds process memory for replay + // continuity. Oldest entries are evicted first. + CodexReasoningReplayCacheMaxEntries = 10240 + + // CodexReasoningReplayCacheEvictBatchSize leaves headroom after the cache + // reaches capacity so high write volume does not rescan the map every turn. + CodexReasoningReplayCacheEvictBatchSize = 128 +) + +type codexReasoningReplayEntry struct { + Items [][]byte + Timestamp time.Time +} + +var ( + codexReasoningReplayMu sync.Mutex + codexReasoningReplayEntries = make(map[string]codexReasoningReplayEntry) +) + +// CacheCodexReasoningReplayItem stores a final GPT/Codex reasoning item for +// stateless replay. The stored item is normalized to the minimal shape accepted +// by Responses input replay. +func CacheCodexReasoningReplayItem(modelName, sessionKey string, item []byte) bool { + return CacheCodexReasoningReplayItems(modelName, sessionKey, [][]byte{item}) +} + +// CacheCodexReasoningReplayItems stores the final GPT/Codex assistant output +// items needed to replay a stateless next turn. +func CacheCodexReasoningReplayItems(modelName, sessionKey string, items [][]byte) bool { + key := codexReasoningReplayCacheKey(modelName, sessionKey) + if key == "" { + return false + } + normalized, ok := normalizeCodexReasoningReplayItems(items) + if !ok { + return false + } + + cacheCleanupOnce.Do(startCacheCleanup) + now := time.Now() + codexReasoningReplayMu.Lock() + defer codexReasoningReplayMu.Unlock() + codexReasoningReplayEntries[key] = codexReasoningReplayEntry{ + Items: normalized, + Timestamp: now, + } + if len(codexReasoningReplayEntries) > CodexReasoningReplayCacheMaxEntries { + evictOldestCodexReasoningReplayEntries(CodexReasoningReplayCacheEvictBatchSize) + } + return true +} + +// GetCodexReasoningReplayItem retrieves a normalized reasoning replay item. +func GetCodexReasoningReplayItem(modelName, sessionKey string) ([]byte, bool) { + items, ok := GetCodexReasoningReplayItems(modelName, sessionKey) + if !ok || len(items) == 0 { + return nil, false + } + return items[0], true +} + +// GetCodexReasoningReplayItems retrieves normalized assistant output items. +func GetCodexReasoningReplayItems(modelName, sessionKey string) ([][]byte, bool) { + key := codexReasoningReplayCacheKey(modelName, sessionKey) + if key == "" { + return nil, false + } + + cacheCleanupOnce.Do(startCacheCleanup) + now := time.Now() + codexReasoningReplayMu.Lock() + defer codexReasoningReplayMu.Unlock() + entry, ok := codexReasoningReplayEntries[key] + if !ok { + return nil, false + } + if now.Sub(entry.Timestamp) > CodexReasoningReplayCacheTTL { + delete(codexReasoningReplayEntries, key) + return nil, false + } + entry.Timestamp = now + codexReasoningReplayEntries[key] = entry + return cloneCodexReasoningReplayItems(entry.Items), true +} + +// DeleteCodexReasoningReplayItem removes one replay item after upstream rejects +// it or the caller otherwise knows it is stale. +func DeleteCodexReasoningReplayItem(modelName, sessionKey string) { + key := codexReasoningReplayCacheKey(modelName, sessionKey) + if key == "" { + return + } + codexReasoningReplayMu.Lock() + delete(codexReasoningReplayEntries, key) + codexReasoningReplayMu.Unlock() +} + +// ClearCodexReasoningReplayCache clears all Codex reasoning replay state. +func ClearCodexReasoningReplayCache() { + codexReasoningReplayMu.Lock() + codexReasoningReplayEntries = make(map[string]codexReasoningReplayEntry) + codexReasoningReplayMu.Unlock() +} + +func codexReasoningReplayCacheKey(modelName, sessionKey string) string { + modelName = strings.TrimSpace(modelName) + sessionKey = strings.TrimSpace(sessionKey) + if modelName == "" || sessionKey == "" { + return "" + } + // The session key is the continuity boundary. Keep this independent from + // the selected upstream Codex credential so auth failover can preserve replay. + return strings.Join([]string{"codex-reasoning-replay", modelName, sessionKey}, "\x00") +} + +func normalizeCodexReasoningReplayItems(items [][]byte) ([][]byte, bool) { + normalized := make([][]byte, 0, len(items)) + for _, item := range items { + normalizedItem, ok := normalizeCodexReasoningReplayItem(item) + if ok { + normalized = append(normalized, normalizedItem) + } + } + return normalized, len(normalized) > 0 +} + +func normalizeCodexReasoningReplayItem(item []byte) ([]byte, bool) { + itemResult := gjson.ParseBytes(item) + switch strings.TrimSpace(itemResult.Get("type").String()) { + case "reasoning": + return normalizeCodexReasoningReplayReasoningItem(itemResult) + case "function_call": + return normalizeCodexReasoningReplayFunctionCallItem(itemResult) + case "custom_tool_call": + return normalizeCodexReasoningReplayCustomToolCallItem(itemResult) + default: + return nil, false + } +} + +func normalizeCodexReasoningReplayReasoningItem(itemResult gjson.Result) ([]byte, bool) { + encryptedContentResult := itemResult.Get("encrypted_content") + if encryptedContentResult.Type != gjson.String { + return nil, false + } + encryptedContent := encryptedContentResult.String() + if encryptedContent != strings.TrimSpace(encryptedContent) { + return nil, false + } + if _, err := signature.InspectGPTReasoningSignature(encryptedContent); err != nil { + return nil, false + } + + normalized := []byte(`{"type":"reasoning","summary":[],"content":null}`) + normalized, _ = sjson.SetBytes(normalized, "encrypted_content", encryptedContent) + return normalized, true +} + +func normalizeCodexReasoningReplayFunctionCallItem(itemResult gjson.Result) ([]byte, bool) { + callID := strings.TrimSpace(itemResult.Get("call_id").String()) + name := strings.TrimSpace(itemResult.Get("name").String()) + arguments := itemResult.Get("arguments") + if callID == "" || name == "" || arguments.Type != gjson.String { + return nil, false + } + + normalized := []byte(`{"type":"function_call"}`) + normalized, _ = sjson.SetBytes(normalized, "call_id", callID) + normalized, _ = sjson.SetBytes(normalized, "name", name) + normalized, _ = sjson.SetBytes(normalized, "arguments", arguments.String()) + return normalized, true +} + +func normalizeCodexReasoningReplayCustomToolCallItem(itemResult gjson.Result) ([]byte, bool) { + callID := strings.TrimSpace(itemResult.Get("call_id").String()) + name := strings.TrimSpace(itemResult.Get("name").String()) + input := itemResult.Get("input") + if callID == "" || name == "" || !input.Exists() { + return nil, false + } + + normalized := []byte(`{"type":"custom_tool_call","status":"completed"}`) + if status := strings.TrimSpace(itemResult.Get("status").String()); status != "" { + normalized, _ = sjson.SetBytes(normalized, "status", status) + } + normalized, _ = sjson.SetBytes(normalized, "call_id", callID) + normalized, _ = sjson.SetBytes(normalized, "name", name) + if input.Type == gjson.String { + normalized, _ = sjson.SetBytes(normalized, "input", input.String()) + } else { + normalized, _ = sjson.SetRawBytes(normalized, "input", []byte(input.Raw)) + } + return normalized, true +} + +func cloneCodexReasoningReplayItems(items [][]byte) [][]byte { + cloned := make([][]byte, 0, len(items)) + for _, item := range items { + cloned = append(cloned, append([]byte(nil), item...)) + } + return cloned +} + +func evictOldestCodexReasoningReplayEntries(count int) { + if count <= 0 || len(codexReasoningReplayEntries) == 0 { + return + } + type candidate struct { + key string + timestamp time.Time + } + candidates := make([]candidate, 0, len(codexReasoningReplayEntries)) + for key, entry := range codexReasoningReplayEntries { + candidates = append(candidates, candidate{key: key, timestamp: entry.Timestamp}) + } + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].timestamp.Before(candidates[j].timestamp) + }) + if count > len(candidates) { + count = len(candidates) + } + for i := 0; i < count; i++ { + delete(codexReasoningReplayEntries, candidates[i].key) + } +} + +func purgeExpiredCodexReasoningReplayCache(now time.Time) { + codexReasoningReplayMu.Lock() + for key, entry := range codexReasoningReplayEntries { + if now.Sub(entry.Timestamp) > CodexReasoningReplayCacheTTL { + delete(codexReasoningReplayEntries, key) + } + } + codexReasoningReplayMu.Unlock() +} diff --git a/internal/cache/codex_reasoning_replay_cache_test.go b/internal/cache/codex_reasoning_replay_cache_test.go new file mode 100644 index 000000000..cc43ed414 --- /dev/null +++ b/internal/cache/codex_reasoning_replay_cache_test.go @@ -0,0 +1,73 @@ +package cache + +import ( + "encoding/base64" + "fmt" + "testing" +) + +func validCodexReasoningReplayEncryptedContentForTest(seed byte) string { + payload := make([]byte, 1+8+16+16+32) + payload[0] = 0x80 + for i := 9; i < len(payload); i++ { + payload[i] = seed + byte(i) + } + return base64.RawURLEncoding.EncodeToString(payload) +} + +func TestCodexReasoningReplayCacheRejectsInvalidItems(t *testing.T) { + ClearCodexReasoningReplayCache() + t.Cleanup(ClearCodexReasoningReplayCache) + + if CacheCodexReasoningReplayItem("gpt-5.4", "session", []byte(`{"type":"reasoning","encrypted_content":"bad","summary":[]}`)) { + t.Fatal("invalid encrypted_content should not be cached") + } + if _, ok := GetCodexReasoningReplayItem("gpt-5.4", "session"); ok { + t.Fatal("invalid item was cached") + } +} + +func TestCodexReasoningReplayCacheScopesByModelAndSession(t *testing.T) { + ClearCodexReasoningReplayCache() + t.Cleanup(ClearCodexReasoningReplayCache) + + encryptedContent := validCodexReasoningReplayEncryptedContentForTest(7) + if !CacheCodexReasoningReplayItem("gpt-5.4", "session-a", []byte(`{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+encryptedContent+`"}`)) { + t.Fatal("valid item was not cached") + } + + if _, ok := GetCodexReasoningReplayItem("gpt-5.5", "session-a"); ok { + t.Fatal("cache should not hit across models") + } + if _, ok := GetCodexReasoningReplayItem("gpt-5.4", "session-b"); ok { + t.Fatal("cache should not hit across sessions") + } + + item, ok := GetCodexReasoningReplayItem("gpt-5.4", "session-a") + if !ok { + t.Fatal("cache miss for original model and session") + } + if string(item) != `{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+encryptedContent+`"}` { + t.Fatalf("normalized item = %s", string(item)) + } +} + +func TestCodexReasoningReplayCacheBatchEvictsWhenFull(t *testing.T) { + ClearCodexReasoningReplayCache() + t.Cleanup(ClearCodexReasoningReplayCache) + + encryptedContent := validCodexReasoningReplayEncryptedContentForTest(9) + item := []byte(`{"type":"reasoning","summary":[],"content":null,"encrypted_content":"` + encryptedContent + `"}`) + for i := 0; i <= CodexReasoningReplayCacheMaxEntries; i++ { + if !CacheCodexReasoningReplayItem("gpt-5.4", fmt.Sprintf("session-%d", i), item) { + t.Fatalf("cache insert %d failed", i) + } + } + + codexReasoningReplayMu.Lock() + gotLen := len(codexReasoningReplayEntries) + codexReasoningReplayMu.Unlock() + if gotLen >= CodexReasoningReplayCacheMaxEntries { + t.Fatalf("cache entries = %d, want batch eviction below max %d", gotLen, CodexReasoningReplayCacheMaxEntries) + } +} diff --git a/internal/cache/signature_cache.go b/internal/cache/signature_cache.go index fd2ccab7c..42020ae72 100644 --- a/internal/cache/signature_cache.go +++ b/internal/cache/signature_cache.go @@ -94,6 +94,7 @@ func purgeExpiredCaches() { } return true }) + purgeExpiredCodexReasoningReplayCache(now) } // CacheSignature stores a thinking signature for a given model group and text. diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index d3c3925ed..2b243db8a 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -4,17 +4,22 @@ import ( "bufio" "bytes" "context" + "crypto/sha256" + "encoding/hex" "fmt" "io" "net/http" + "regexp" "sort" "strings" "time" codexauth "github.com/router-for-me/CLIProxyAPI/v7/internal/auth/codex" + internalcache "github.com/router-for-me/CLIProxyAPI/v7/internal/cache" "github.com/router-for-me/CLIProxyAPI/v7/internal/config" "github.com/router-for-me/CLIProxyAPI/v7/internal/misc" "github.com/router-for-me/CLIProxyAPI/v7/internal/runtime/executor/helps" + "github.com/router-for-me/CLIProxyAPI/v7/internal/signature" "github.com/router-for-me/CLIProxyAPI/v7/internal/thinking" "github.com/router-for-me/CLIProxyAPI/v7/internal/util" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth" @@ -36,6 +41,7 @@ const ( ) var dataTag = []byte("data:") +var codexClaudeCodeSessionPattern = regexp.MustCompile(`_session_([a-f0-9-]+)$`) // Streamed Codex responses may emit response.output_item.done events while leaving // response.completed.response.output empty. Keep the stream path aligned with the @@ -101,6 +107,14 @@ func patchCodexCompletedOutput(eventData []byte, outputItemsByIndex map[int64][] } func codexTerminalStreamContextLengthErr(eventData []byte) (statusErr, bool) { + streamErr, body, ok := codexTerminalStreamErr(eventData) + if !ok || !codexTerminalErrorIsContextLength(body) { + return statusErr{}, false + } + return streamErr, true +} + +func codexTerminalStreamErr(eventData []byte) (statusErr, []byte, bool) { eventType := gjson.GetBytes(eventData, "type").String() var body []byte switch eventType { @@ -115,15 +129,23 @@ func codexTerminalStreamContextLengthErr(eventData []byte) (statusErr, bool) { body = codexTerminalErrorBody(eventData, "error") } default: - return statusErr{}, false + return statusErr{}, nil, false } if len(body) == 0 { - return statusErr{}, false + return statusErr{}, nil, false } - if !codexTerminalErrorIsContextLength(body) { - return statusErr{}, false + if !codexTerminalStreamErrShouldHandle(body) { + return statusErr{}, nil, false } - return newCodexStatusErr(http.StatusBadRequest, body), true + return newCodexStatusErr(http.StatusBadRequest, body), body, true +} + +func codexTerminalStreamErrShouldHandle(body []byte) bool { + if codexTerminalErrorIsContextLength(body) { + return true + } + code, _, ok := codexStatusErrorClassification(http.StatusBadRequest, body) + return ok && code == "thinking_signature_invalid" } func codexTerminalErrorBody(eventData []byte, path string) []byte { @@ -217,6 +239,482 @@ func translateCodexRequestPair(from, to sdktranslator.Format, model string, orig return originalTranslated, body } +type codexReasoningReplayScope struct { + modelName string + sessionKey string +} + +func (s codexReasoningReplayScope) valid() bool { + return strings.TrimSpace(s.modelName) != "" && strings.TrimSpace(s.sessionKey) != "" +} + +func applyCodexReasoningReplayCache(ctx context.Context, from sdktranslator.Format, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, body []byte) ([]byte, codexReasoningReplayScope) { + scope := codexReasoningReplayScopeFromRequest(ctx, from, req, opts, body) + if !scope.valid() { + return body, scope + } + items, ok := internalcache.GetCodexReasoningReplayItems(scope.modelName, scope.sessionKey) + if !ok { + return body, scope + } + items = filterCodexReasoningReplayItemsForInput(body, items) + if len(items) == 0 { + return body, scope + } + updated, ok := insertCodexReasoningReplayItems(body, items) + if !ok { + return body, scope + } + return updated, scope +} + +func codexReasoningReplayScopeFromRequest(ctx context.Context, from sdktranslator.Format, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, body []byte) codexReasoningReplayScope { + if !codexReasoningReplayEnabledForSource(from) { + return codexReasoningReplayScope{} + } + return codexReasoningReplayScope{ + modelName: thinking.ParseSuffix(req.Model).ModelName, + sessionKey: codexReasoningReplaySessionKey(ctx, from, req, opts, body), + } +} + +func codexReasoningReplayEnabledForSource(from sdktranslator.Format) bool { + return sourceFormatEqual(from, sdktranslator.FormatClaude) +} + +func sourceFormatEqual(from, want sdktranslator.Format) bool { + return strings.EqualFold(strings.TrimSpace(from.String()), want.String()) +} + +func codexClaudeCodeReplaySessionKey(payload []byte) string { + sessionID := extractClaudeCodeSessionIDForCodexReplay(payload) + if sessionID == "" { + return "" + } + return "claude:" + sessionID +} + +func codexClaudeCodePromptCacheStorageKey(req cliproxyexecutor.Request) string { + sessionID := extractClaudeCodeSessionIDForCodexReplay(req.Payload) + if sessionID == "" { + return "" + } + return fmt.Sprintf("%s-claude:%s", req.Model, sessionID) +} + +func codexClaudeCodePromptCache(req cliproxyexecutor.Request) (helps.CodexCache, bool) { + key := codexClaudeCodePromptCacheStorageKey(req) + if key == "" { + return helps.CodexCache{}, false + } + if cache, ok := helps.GetCodexCache(key); ok { + return cache, true + } + cache := helps.CodexCache{ + ID: uuid.New().String(), + Expire: time.Now().Add(1 * time.Hour), + } + helps.SetCodexCache(key, cache) + return cache, true +} + +func extractClaudeCodeSessionIDForCodexReplay(payload []byte) string { + if len(payload) == 0 { + return "" + } + userID := gjson.GetBytes(payload, "metadata.user_id").String() + if userID == "" { + return "" + } + if matches := codexClaudeCodeSessionPattern.FindStringSubmatch(userID); len(matches) >= 2 { + return matches[1] + } + if len(userID) > 0 && userID[0] == '{' { + return gjson.Get(userID, "session_id").String() + } + return "" +} + +func codexReasoningReplaySessionKey(ctx context.Context, from sdktranslator.Format, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, body []byte) string { + if ctx == nil { + ctx = context.Background() + } + if value := metadataString(opts.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); value != "" { + return "execution:" + value + } + if value := metadataString(req.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); value != "" { + return "execution:" + value + } + if value := codexReasoningReplaySessionKeyFromPayload(body); value != "" { + return value + } + if value := codexReasoningReplaySessionKeyFromPayload(req.Payload); value != "" { + return value + } + if value := codexReasoningReplaySessionKeyFromHeaders(opts.Headers); value != "" { + return value + } + if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil && ginCtx.Request != nil { + if value := codexReasoningReplaySessionKeyFromHeaders(ginCtx.Request.Header); value != "" { + return value + } + } + if sourceFormatEqual(from, sdktranslator.FormatClaude) { + return codexClaudeCodeReplaySessionKey(req.Payload) + } + if sourceFormatEqual(from, sdktranslator.FormatOpenAI) { + if apiKey := strings.TrimSpace(helps.APIKeyFromContext(ctx)); apiKey != "" { + return "prompt-cache:" + uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+apiKey)).String() + } + } + return "" +} + +func metadataString(metadata map[string]any, key string) string { + if len(metadata) == 0 { + return "" + } + raw, ok := metadata[key] + if !ok || raw == nil { + return "" + } + switch v := raw.(type) { + case string: + return strings.TrimSpace(v) + case []byte: + return strings.TrimSpace(string(v)) + default: + return "" + } +} + +func codexReasoningReplaySessionKeyFromPayload(payload []byte) string { + if len(payload) == 0 { + return "" + } + if promptCacheKey := strings.TrimSpace(gjson.GetBytes(payload, "prompt_cache_key").String()); promptCacheKey != "" { + return "prompt-cache:" + promptCacheKey + } + if windowID := strings.TrimSpace(gjson.GetBytes(payload, "client_metadata.x-codex-window-id").String()); windowID != "" { + return "window:" + windowID + } + if turnMetadata := strings.TrimSpace(gjson.GetBytes(payload, "client_metadata.x-codex-turn-metadata").String()); turnMetadata != "" { + return codexReasoningReplaySessionKeyFromTurnMetadata(turnMetadata) + } + return "" +} + +func codexReasoningReplaySessionKeyFromHeaders(headers http.Header) string { + if headers == nil { + return "" + } + if turnMetadata := strings.TrimSpace(headers.Get("X-Codex-Turn-Metadata")); turnMetadata != "" { + if key := codexReasoningReplaySessionKeyFromTurnMetadata(turnMetadata); key != "" { + return key + } + } + if windowID := strings.TrimSpace(headerValueCaseInsensitive(headers, "X-Codex-Window-Id")); windowID != "" { + return "window:" + windowID + } + for _, headerName := range []string{"Session_id", "session_id", "Session-Id"} { + if value := strings.TrimSpace(headerValueCaseInsensitive(headers, headerName)); value != "" { + return "session-id:" + value + } + } + if conversationID := strings.TrimSpace(headerValueCaseInsensitive(headers, "Conversation_id")); conversationID != "" { + return "conversation_id:" + conversationID + } + return "" +} + +func codexReasoningReplaySessionKeyFromTurnMetadata(turnMetadata string) string { + if promptCacheKey := strings.TrimSpace(gjson.Get(turnMetadata, "prompt_cache_key").String()); promptCacheKey != "" { + return "prompt-cache:" + promptCacheKey + } + if windowID := strings.TrimSpace(gjson.Get(turnMetadata, "window_id").String()); windowID != "" { + return "window:" + windowID + } + return "" +} + +func codexInputHasValidReasoningEncryptedContent(body []byte) bool { + input := gjson.GetBytes(body, "input") + if !input.IsArray() { + return false + } + for _, item := range input.Array() { + if strings.TrimSpace(item.Get("type").String()) != "reasoning" { + continue + } + encryptedContent := item.Get("encrypted_content") + if encryptedContent.Type != gjson.String { + continue + } + if _, err := signature.InspectGPTReasoningSignature(encryptedContent.String()); err == nil { + return true + } + } + return false +} + +func filterCodexReasoningReplayItemsForInput(body []byte, items [][]byte) [][]byte { + input := gjson.GetBytes(body, "input") + if !input.IsArray() { + return nil + } + + hasInputReasoning := codexInputHasValidReasoningEncryptedContent(body) + existingCalls := make(map[string]bool) + for _, inputItem := range input.Array() { + for _, key := range codexReplayToolCallKeys(inputItem) { + existingCalls[key] = true + } + } + + filtered := make([][]byte, 0, len(items)) + for _, item := range items { + itemResult := gjson.ParseBytes(item) + switch strings.TrimSpace(itemResult.Get("type").String()) { + case "reasoning": + if hasInputReasoning { + continue + } + case "function_call", "custom_tool_call": + keys := codexReplayToolCallKeys(itemResult) + if len(keys) == 0 || codexReplayAnyToolCallKeyExists(existingCalls, keys) { + continue + } + for _, key := range keys { + existingCalls[key] = true + } + default: + continue + } + filtered = append(filtered, item) + } + return filtered +} + +func insertCodexReasoningReplayItems(body []byte, replayItems [][]byte) ([]byte, bool) { + input := gjson.GetBytes(body, "input") + if !input.IsArray() || len(replayItems) == 0 { + return body, false + } + inputItems := input.Array() + insertIndex := codexReasoningReplayInsertIndex(inputItems, replayItems) + replayItems = codexAlignReasoningReplayToolCallIDs(inputItems, replayItems) + items := make([]string, 0, len(inputItems)+len(replayItems)) + for i, inputItem := range inputItems { + if i == insertIndex { + for _, replayItem := range replayItems { + items = append(items, string(replayItem)) + } + } + items = append(items, inputItem.Raw) + } + if insertIndex == len(inputItems) { + for _, replayItem := range replayItems { + items = append(items, string(replayItem)) + } + } + updated, err := sjson.SetRawBytes(body, "input", []byte("["+strings.Join(items, ",")+"]")) + if err != nil { + return body, false + } + return updated, true +} + +func codexReasoningReplayInsertIndex(inputItems []gjson.Result, replayItems [][]byte) int { + replayCallIDs := make(map[string]bool) + for _, replayItem := range replayItems { + itemResult := gjson.ParseBytes(replayItem) + itemType := strings.TrimSpace(itemResult.Get("type").String()) + if itemType != "function_call" && itemType != "custom_tool_call" { + continue + } + for _, callID := range codexReplayComparableCallIDs(itemResult.Get("call_id").String()) { + replayCallIDs[callID] = true + } + } + if len(replayCallIDs) > 0 { + for index, inputItem := range inputItems { + itemType := strings.TrimSpace(inputItem.Get("type").String()) + if itemType != "function_call_output" && itemType != "custom_tool_call_output" { + continue + } + callID := strings.TrimSpace(inputItem.Get("call_id").String()) + if callID == "" || replayCallIDs[callID] { + return index + } + } + } + for index := len(inputItems) - 1; index >= 0; index-- { + inputItem := inputItems[index] + if strings.TrimSpace(inputItem.Get("type").String()) == "message" && strings.TrimSpace(inputItem.Get("role").String()) == "assistant" { + return index + } + } + for index, inputItem := range inputItems { + if shouldInsertCodexReasoningReplayBefore(inputItem) { + return index + } + } + return len(inputItems) +} + +func codexAlignReasoningReplayToolCallIDs(inputItems []gjson.Result, replayItems [][]byte) [][]byte { + outputCallIDs := codexReplayOutputCallIDs(inputItems) + if len(outputCallIDs) == 0 { + return replayItems + } + + aligned := make([][]byte, 0, len(replayItems)) + for _, replayItem := range replayItems { + itemResult := gjson.ParseBytes(replayItem) + itemType := strings.TrimSpace(itemResult.Get("type").String()) + if itemType != "function_call" && itemType != "custom_tool_call" { + aligned = append(aligned, replayItem) + continue + } + + callID := strings.TrimSpace(itemResult.Get("call_id").String()) + outputCallID := "" + for _, candidate := range codexReplayComparableCallIDs(callID) { + if value := outputCallIDs[candidate]; value != "" { + outputCallID = value + break + } + } + if outputCallID == "" || outputCallID == callID { + aligned = append(aligned, replayItem) + continue + } + + updated, err := sjson.SetBytes(replayItem, "call_id", outputCallID) + if err != nil { + aligned = append(aligned, replayItem) + continue + } + aligned = append(aligned, updated) + } + return aligned +} + +func codexReplayOutputCallIDs(inputItems []gjson.Result) map[string]string { + outputCallIDs := make(map[string]string) + for _, inputItem := range inputItems { + itemType := strings.TrimSpace(inputItem.Get("type").String()) + if itemType != "function_call_output" && itemType != "custom_tool_call_output" { + continue + } + callID := strings.TrimSpace(inputItem.Get("call_id").String()) + if callID == "" { + continue + } + for _, candidate := range codexReplayComparableCallIDs(callID) { + outputCallIDs[candidate] = callID + } + } + return outputCallIDs +} + +func shouldInsertCodexReasoningReplayBefore(item gjson.Result) bool { + if strings.TrimSpace(item.Get("type").String()) != "message" { + return true + } + switch strings.TrimSpace(item.Get("role").String()) { + case "developer", "system": + return false + default: + return true + } +} + +func codexReplayToolCallKeys(item gjson.Result) []string { + itemType := strings.TrimSpace(item.Get("type").String()) + if itemType != "function_call" && itemType != "custom_tool_call" { + return nil + } + callIDs := codexReplayComparableCallIDs(item.Get("call_id").String()) + if len(callIDs) == 0 { + return nil + } + keys := make([]string, 0, len(callIDs)) + for _, callID := range callIDs { + keys = append(keys, itemType+":"+callID) + } + return keys +} + +func codexReplayAnyToolCallKeyExists(existing map[string]bool, keys []string) bool { + for _, key := range keys { + if existing[key] { + return true + } + } + return false +} + +func codexReplayComparableCallIDs(callID string) []string { + callID = strings.TrimSpace(callID) + if callID == "" { + return nil + } + + claudeVisibleCallID := shortenCodexReplayCallIDIfNeeded(util.SanitizeClaudeToolID(callID)) + if claudeVisibleCallID == "" || claudeVisibleCallID == callID { + return []string{callID} + } + return []string{callID, claudeVisibleCallID} +} + +func shortenCodexReplayCallIDIfNeeded(id string) string { + const limit = 64 + if len(id) <= limit { + return id + } + + sum := sha256.Sum256([]byte(id)) + suffix := "_" + hex.EncodeToString(sum[:8]) + prefixLen := limit - len(suffix) + if prefixLen <= 0 { + return suffix[len(suffix)-limit:] + } + return id[:prefixLen] + suffix +} + +func cacheCodexReasoningReplayFromCompleted(scope codexReasoningReplayScope, completedData []byte) { + if !scope.valid() { + return + } + output := gjson.GetBytes(completedData, "response.output") + if !output.IsArray() { + return + } + items := make([][]byte, 0, len(output.Array())) + for _, item := range output.Array() { + switch strings.TrimSpace(item.Get("type").String()) { + case "reasoning", "function_call", "custom_tool_call": + items = append(items, []byte(item.Raw)) + default: + continue + } + } + if !internalcache.CacheCodexReasoningReplayItems(scope.modelName, scope.sessionKey, items) { + internalcache.DeleteCodexReasoningReplayItem(scope.modelName, scope.sessionKey) + } +} + +func clearCodexReasoningReplayOnInvalidSignature(scope codexReasoningReplayScope, statusCode int, body []byte) { + if !scope.valid() { + return + } + code, _, ok := codexStatusErrorClassification(statusCode, body) + if ok && code == "thinking_signature_invalid" { + internalcache.DeleteCodexReasoningReplayItem(scope.modelName, scope.sessionKey) + } +} + // PrepareRequest injects Codex credentials into the outgoing HTTP request. func (e *CodexExecutor) PrepareRequest(req *http.Request, auth *cliproxyauth.Auth) error { if req == nil { @@ -295,6 +793,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re body = ensureImageGenerationTool(body, baseModel, auth) } body = sanitizeOpenAIResponsesReasoningEncryptedContent(ctx, "codex executor", body) + body, replayScope := applyCodexReasoningReplayCache(ctx, from, req, opts, body) reporter.SetTranslatedReasoningEffort(body, to.String()) url := strings.TrimSuffix(baseURL, "/") + "/responses" @@ -338,6 +837,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) b = applyCodexIdentityConfuseResponsePayload(b, identityState) + clearCodexReasoningReplayOnInvalidSignature(replayScope, httpResp.StatusCode, b) helps.AppendAPIResponseChunk(ctx, e.cfg, b) helps.LogWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = newCodexStatusErr(httpResp.StatusCode, b) @@ -362,7 +862,8 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re eventData := bytes.TrimSpace(line[5:]) eventType := gjson.GetBytes(eventData, "type").String() - if streamErr, ok := codexTerminalStreamContextLengthErr(eventData); ok { + if streamErr, terminalBody, ok := codexTerminalStreamErr(eventData); ok { + clearCodexReasoningReplayOnInvalidSignature(replayScope, streamErr.StatusCode(), terminalBody) err = streamErr return resp, err } @@ -412,6 +913,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re } completedData = completedDataPatched } + cacheCodexReasoningReplayFromCompleted(replayScope, completedData) var param any clientCompletedData := applyCodexIdentityExposeResponsePayload(completedData, identityState) @@ -566,6 +1068,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au body = ensureImageGenerationTool(body, baseModel, auth) } body = sanitizeOpenAIResponsesReasoningEncryptedContent(ctx, "codex executor", body) + body, replayScope := applyCodexReasoningReplayCache(ctx, from, req, opts, body) reporter.SetTranslatedReasoningEffort(body, to.String()) url := strings.TrimSuffix(baseURL, "/") + "/responses" @@ -612,6 +1115,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au return nil, readErr } data = applyCodexIdentityConfuseResponsePayload(data, identityState) + clearCodexReasoningReplayOnInvalidSignature(replayScope, httpResp.StatusCode, data) helps.AppendAPIResponseChunk(ctx, e.cfg, data) helps.LogWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) err = newCodexStatusErr(httpResp.StatusCode, data) @@ -637,7 +1141,8 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au if bytes.HasPrefix(line, dataTag) { data := bytes.TrimSpace(line[5:]) - if streamErr, ok := codexTerminalStreamContextLengthErr(data); ok { + if streamErr, terminalBody, ok := codexTerminalStreamErr(data); ok { + clearCodexReasoningReplayOnInvalidSignature(replayScope, streamErr.StatusCode(), terminalBody) helps.RecordAPIResponseError(ctx, e.cfg, streamErr) reporter.PublishFailure(ctx, streamErr) select { @@ -655,6 +1160,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au } publishCodexImageToolUsage(ctx, reporter, body, data) data = patchCodexCompletedOutput(data, outputItemsByIndex, outputItemsFallback) + cacheCodexReasoningReplayFromCompleted(replayScope, data) translatedLine = append([]byte("data: "), data...) } } @@ -895,25 +1401,16 @@ type codexIdentityReplacement struct { func (e *CodexExecutor) cacheHelper(ctx context.Context, from sdktranslator.Format, url string, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, userPayload []byte, rawJSON []byte) (*http.Request, []byte, codexIdentityConfuseState, error) { var cache helps.CodexCache - if from == "claude" { - userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id") - if userIDResult.Exists() { - key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String()) - var ok bool - if cache, ok = helps.GetCodexCache(key); !ok { - cache = helps.CodexCache{ - ID: uuid.New().String(), - Expire: time.Now().Add(1 * time.Hour), - } - helps.SetCodexCache(key, cache) - } + if sourceFormatEqual(from, sdktranslator.FormatClaude) { + if cached, ok := codexClaudeCodePromptCache(req); ok { + cache = cached } - } else if from == "openai-response" { + } else if sourceFormatEqual(from, sdktranslator.FormatOpenAIResponse) { promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key") if promptCacheKey.Exists() { cache.ID = promptCacheKey.String() } - } else if from == "openai" { + } else if sourceFormatEqual(from, sdktranslator.FormatOpenAI) { if apiKey := strings.TrimSpace(helps.APIKeyFromContext(ctx)); apiKey != "" { cache.ID = uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+apiKey)).String() } @@ -978,10 +1475,7 @@ func applyCodexIdentityConfuseHeaders(headers http.Header, state *codexIdentityC return } - setHeaderCasePreserved(headers, "Session-Id", state.promptCacheKey) - if headerValueCaseInsensitive(headers, "session_id") != "" { - setHeaderCasePreserved(headers, "session_id", state.promptCacheKey) - } + setCodexSessionHeaderCasePreserved(headers, "Session_id", state.promptCacheKey) if headerValueCaseInsensitive(headers, "Conversation_id") != "" { setHeaderCasePreserved(headers, "Conversation_id", state.promptCacheKey) } diff --git a/internal/runtime/executor/codex_executor_cache_test.go b/internal/runtime/executor/codex_executor_cache_test.go index 3f7d412ba..d33d7fc64 100644 --- a/internal/runtime/executor/codex_executor_cache_test.go +++ b/internal/runtime/executor/codex_executor_cache_test.go @@ -47,8 +47,11 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_StablePromptCacheKeyFrom if gotConversation := httpReq.Header.Get("Conversation_id"); gotConversation != "" { t.Fatalf("Conversation_id = %q, want empty", gotConversation) } - if gotSession := httpReq.Header.Get("Session_id"); gotSession != expectedKey { - t.Fatalf("Session_id = %q, want %q", gotSession, expectedKey) + if gotSession := httpReq.Header["Session_id"]; len(gotSession) != 1 || gotSession[0] != expectedKey { + t.Fatalf("Session_id = %#v, want [%q]", gotSession, expectedKey) + } + if gotCanonicalSession := httpReq.Header.Get("Session-Id"); gotCanonicalSession != "" { + t.Fatalf("Session-Id = %q, want empty", gotCanonicalSession) } httpReq2, _, _, err := executor.cacheHelper(ctx, sdktranslator.FromString("openai"), url, nil, req, req.Payload, rawJSON) @@ -65,6 +68,88 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_StablePromptCacheKeyFrom } } +func TestCodexExecutorCacheHelper_ClaudeUsesClaudeCodeSessionID(t *testing.T) { + executor := &CodexExecutor{} + ctx := context.Background() + url := "https://example.com/responses" + rawJSON := []byte(`{"model":"gpt-5.4","stream":true}`) + firstReq := cliproxyexecutor.Request{ + Model: "gpt-5.4-claude-cache-session", + Payload: []byte(`{ + "model":"gpt-5.4", + "metadata":{"user_id":"{\"device_id\":\"device-a\",\"account_uuid\":\"\",\"session_id\":\"cache-session-1\"}"}, + "messages":[{"role":"user","content":[{"type":"text","text":"first"}]}] + }`), + } + secondReq := cliproxyexecutor.Request{ + Model: "gpt-5.4-claude-cache-session", + Payload: []byte(`{ + "model":"gpt-5.4", + "metadata":{"user_id":"{\"device_id\":\"device-b\",\"account_uuid\":\"\",\"session_id\":\"cache-session-1\"}"}, + "messages":[{"role":"user","content":[{"type":"text","text":"next"}]}] + }`), + } + + firstHTTPReq, _, _, err := executor.cacheHelper(ctx, sdktranslator.FromString("claude"), url, nil, firstReq, firstReq.Payload, rawJSON) + if err != nil { + t.Fatalf("cacheHelper first error: %v", err) + } + secondHTTPReq, _, _, err := executor.cacheHelper(ctx, sdktranslator.FromString("claude"), url, nil, secondReq, secondReq.Payload, rawJSON) + if err != nil { + t.Fatalf("cacheHelper second error: %v", err) + } + + firstBody, errRead := io.ReadAll(firstHTTPReq.Body) + if errRead != nil { + t.Fatalf("read first request body: %v", errRead) + } + secondBody, errRead := io.ReadAll(secondHTTPReq.Body) + if errRead != nil { + t.Fatalf("read second request body: %v", errRead) + } + firstKey := gjson.GetBytes(firstBody, "prompt_cache_key").String() + secondKey := gjson.GetBytes(secondBody, "prompt_cache_key").String() + if firstKey == "" { + t.Fatalf("first prompt_cache_key is empty; body=%s", string(firstBody)) + } + if secondKey != firstKey { + t.Fatalf("same Claude Code session_id produced different prompt_cache_key: first=%q second=%q", firstKey, secondKey) + } + if gotSession := firstHTTPReq.Header["Session_id"]; len(gotSession) != 1 || gotSession[0] != firstKey { + t.Fatalf("first Session_id = %#v, want [%q]", gotSession, firstKey) + } + if gotSession := secondHTTPReq.Header["Session_id"]; len(gotSession) != 1 || gotSession[0] != firstKey { + t.Fatalf("second Session_id = %#v, want [%q]", gotSession, firstKey) + } +} + +func TestCodexExecutorCacheHelper_ClaudeRejectsBareUserID(t *testing.T) { + executor := &CodexExecutor{} + req := cliproxyexecutor.Request{ + Model: "gpt-5.4-claude-cache-bare-user", + Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"same-user-across-chats"},"messages":[{"role":"user","content":[{"type":"text","text":"first"}]}]}`), + } + + httpReq, _, _, err := executor.cacheHelper(context.Background(), sdktranslator.FromString("claude"), "https://example.com/responses", nil, req, req.Payload, []byte(`{"model":"gpt-5.4","stream":true}`)) + if err != nil { + t.Fatalf("cacheHelper error: %v", err) + } + + body, errRead := io.ReadAll(httpReq.Body) + if errRead != nil { + t.Fatalf("read request body: %v", errRead) + } + if got := gjson.GetBytes(body, "prompt_cache_key").String(); got != "" { + t.Fatalf("bare metadata.user_id must not create prompt_cache_key, got %q; body=%s", got, string(body)) + } + if got := httpReq.Header["Session_id"]; len(got) != 0 { + t.Fatalf("bare metadata.user_id must not create Session_id, got %#v", got) + } + if got := httpReq.Header.Get("Session-Id"); got != "" { + t.Fatalf("bare metadata.user_id must not create Session-Id, got %q", got) + } +} + func TestCodexExecutorCacheHelper_IdentityConfuseRemapsBodyAndHeaders(t *testing.T) { recorder := httptest.NewRecorder() ginCtx, _ := gin.CreateTestContext(recorder) @@ -114,13 +199,16 @@ func TestCodexExecutorCacheHelper_IdentityConfuseRemapsBodyAndHeaders(t *testing if gotWindowID := gjson.GetBytes(body, "client_metadata.x-codex-window-id").String(); gotWindowID != expectedPromptCacheKey+":0" { t.Fatalf("client_metadata.x-codex-window-id = %q, want %q", gotWindowID, expectedPromptCacheKey+":0") } - for _, headerName := range []string{"Session-Id", "X-Client-Request-Id", "Thread-Id"} { + if gotHeader := httpReq.Header["Session_id"]; len(gotHeader) != 1 || gotHeader[0] != expectedPromptCacheKey { + t.Fatalf("Session_id = %#v, want [%q]", gotHeader, expectedPromptCacheKey) + } + for _, headerName := range []string{"X-Client-Request-Id", "Thread-Id"} { if gotHeader := httpReq.Header.Get(headerName); gotHeader != expectedPromptCacheKey { t.Fatalf("%s = %q, want %q", headerName, gotHeader, expectedPromptCacheKey) } } - if gotSession := httpReq.Header.Get("Session_id"); gotSession != expectedPromptCacheKey { - t.Fatalf("Session_id = %q, want %q", gotSession, expectedPromptCacheKey) + if gotCanonicalSession := httpReq.Header.Get("Session-Id"); gotCanonicalSession != "" { + t.Fatalf("Session-Id = %q, want empty", gotCanonicalSession) } if gotWindow := httpReq.Header.Get("X-Codex-Window-Id"); gotWindow != expectedPromptCacheKey+":0" { t.Fatalf("X-Codex-Window-Id = %q, want %q", gotWindow, expectedPromptCacheKey+":0") diff --git a/internal/runtime/executor/codex_executor_reasoning_replay_cache_test.go b/internal/runtime/executor/codex_executor_reasoning_replay_cache_test.go new file mode 100644 index 000000000..a15007ed3 --- /dev/null +++ b/internal/runtime/executor/codex_executor_reasoning_replay_cache_test.go @@ -0,0 +1,803 @@ +package executor + +import ( + "context" + "crypto/sha256" + "encoding/base64" + "encoding/hex" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/gin-gonic/gin" + internalcache "github.com/router-for-me/CLIProxyAPI/v7/internal/cache" + "github.com/router-for-me/CLIProxyAPI/v7/internal/config" + _ "github.com/router-for-me/CLIProxyAPI/v7/internal/translator" + cliproxyauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/executor" + sdktranslator "github.com/router-for-me/CLIProxyAPI/v7/sdk/translator" + "github.com/tidwall/gjson" +) + +func validCodexReasoningEncryptedContentForTestSeed(seed byte) string { + payload := make([]byte, 1+8+16+16+32) + payload[0] = 0x80 + for i := 9; i < len(payload); i++ { + payload[i] = seed + byte(i) + } + return base64.RawURLEncoding.EncodeToString(payload) +} + +func shortenedCodexReplayCallIDForTest(id string) string { + const limit = 64 + if len(id) <= limit { + return id + } + + sum := sha256.Sum256([]byte(id)) + suffix := "_" + hex.EncodeToString(sum[:8]) + prefixLen := limit - len(suffix) + if prefixLen <= 0 { + return suffix[len(suffix)-limit:] + } + return id[:prefixLen] + suffix +} + +func TestCodexExecutorReasoningReplayCacheStoresFinalDoneAndInjectsNextClaudeRequest(t *testing.T) { + internalcache.ClearCodexReasoningReplayCache() + t.Cleanup(internalcache.ClearCodexReasoningReplayCache) + + addedEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(1) + doneEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(2) + var bodies [][]byte + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, errRead := io.ReadAll(r.Body) + if errRead != nil { + t.Fatalf("read body: %v", errRead) + } + bodies = append(bodies, body) + + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte(`data: {"type":"response.output_item.added","item":{"id":"rs_added","type":"reasoning","status":"in_progress","summary":[],"encrypted_content":"` + addedEncryptedContent + `"},"output_index":0}` + "\n")) + _, _ = w.Write([]byte(`data: {"type":"response.output_item.done","item":{"id":"rs_done","type":"reasoning","summary":[],"encrypted_content":"` + doneEncryptedContent + `"},"output_index":0}` + "\n")) + _, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[],"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}}` + "\n\n")) + })) + defer server.Close() + + executor := NewCodexExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + ID: "auth-replay-1", + Attributes: map[string]string{ + "base_url": server.URL, + "api_key": "test", + }, + } + opts := cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("claude"), + Stream: false, + } + + _, err := executor.Execute(context.Background(), auth, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-1\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"hello"}]}]}`), + }, opts) + if err != nil { + t.Fatalf("first Execute error: %v", err) + } + + _, err = executor.Execute(context.Background(), auth, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-1\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]}`), + }, opts) + if err != nil { + t.Fatalf("second Execute error: %v", err) + } + + if len(bodies) != 2 { + t.Fatalf("upstream request count = %d, want 2", len(bodies)) + } + secondBody := bodies[1] + if got := gjson.GetBytes(secondBody, "input.0.type").String(); got != "reasoning" { + t.Fatalf("input.0.type = %q, want reasoning; body=%s", got, string(secondBody)) + } + if got := gjson.GetBytes(secondBody, "input.0.encrypted_content").String(); got != doneEncryptedContent { + t.Fatalf("injected encrypted_content = %q, want final done %q; body=%s", got, doneEncryptedContent, string(secondBody)) + } + if got := gjson.GetBytes(secondBody, "input.1.role").String(); got != "user" { + t.Fatalf("input.1.role = %q, want user; body=%s", got, string(secondBody)) + } +} + +func TestCodexExecutorReasoningReplayCacheSharesSameSessionAcrossClientKeys(t *testing.T) { + internalcache.ClearCodexReasoningReplayCache() + t.Cleanup(internalcache.ClearCodexReasoningReplayCache) + + from := sdktranslator.FromString("claude") + req := cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-only\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]}`), + } + opts := cliproxyexecutor.Options{SourceFormat: from} + body := []byte(`{"model":"gpt-5.4","input":[{"type":"message","role":"user","content":[{"type":"input_text","text":"next"}]}]}`) + encryptedContent := validCodexReasoningEncryptedContentForTestSeed(11) + + firstScope := codexReasoningReplayScopeFromRequest(codexReplaySessionOnlyContext("client-key-a"), from, req, opts, body) + if !firstScope.valid() { + t.Fatalf("first replay scope is invalid: %#v", firstScope) + } + cacheCodexReasoningReplayFromCompleted(firstScope, []byte(`{"response":{"output":[{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+encryptedContent+`"}]}}`)) + + secondBody, secondScope := applyCodexReasoningReplayCache(codexReplaySessionOnlyContext("client-key-b"), from, req, opts, body) + if secondScope != firstScope { + t.Fatalf("replay scope should ignore client API key for the same session: first=%#v second=%#v", firstScope, secondScope) + } + if got := gjson.GetBytes(secondBody, "input.0.type").String(); got != "reasoning" { + t.Fatalf("input.0.type = %q, want same-session replay; body=%s", got, string(secondBody)) + } + if got := gjson.GetBytes(secondBody, "input.0.encrypted_content").String(); got != encryptedContent { + t.Fatalf("injected encrypted_content = %q, want cached value", got) + } +} + +func TestCodexExecutorReasoningReplaySessionKeyUsesClaudeCodeJSONSessionID(t *testing.T) { + from := sdktranslator.FromString("claude") + req := cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{ + "model":"gpt-5.4", + "metadata":{"user_id":"{\"device_id\":\"device-a\",\"account_uuid\":\"\",\"session_id\":\"session-json-1\"}"}, + "messages":[{"role":"user","content":[{"type":"text","text":"next"}]}] + }`), + } + body := []byte(`{"model":"gpt-5.4","input":[{"type":"message","role":"user","content":[{"type":"input_text","text":"next"}]}]}`) + + got := codexReasoningReplaySessionKey(context.Background(), from, req, cliproxyexecutor.Options{SourceFormat: from}, body) + if got != "claude:session-json-1" { + t.Fatalf("codexReasoningReplaySessionKey() = %q, want claude:session-json-1", got) + } +} + +func TestCodexExecutorReasoningReplaySessionKeyRejectsBareClaudeUserID(t *testing.T) { + from := sdktranslator.FromString("claude") + req := cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"same-user-across-chats"},"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]}`), + } + body := []byte(`{"model":"gpt-5.4","input":[{"type":"message","role":"user","content":[{"type":"input_text","text":"next"}]}]}`) + + got := codexReasoningReplaySessionKey(context.Background(), from, req, cliproxyexecutor.Options{SourceFormat: from}, body) + if got != "" { + t.Fatalf("bare metadata.user_id must not become replay session key, got %q", got) + } +} + +func TestCodexExecutorReasoningReplaySessionKeyCanonicalizesSessionHeaderAliases(t *testing.T) { + legacy := http.Header{"Session_id": []string{"session-alias"}} + lowercase := http.Header{"session_id": []string{"session-alias"}} + canonical := http.Header{"Session-Id": []string{"session-alias"}} + + gotLegacy := codexReasoningReplaySessionKeyFromHeaders(legacy) + gotLowercase := codexReasoningReplaySessionKeyFromHeaders(lowercase) + gotCanonical := codexReasoningReplaySessionKeyFromHeaders(canonical) + + if gotLegacy != gotLowercase || gotLowercase != gotCanonical { + t.Fatalf("session header aliases produced different keys: legacy=%q lowercase=%q canonical=%q", gotLegacy, gotLowercase, gotCanonical) + } + if gotCanonical != "session-id:session-alias" { + t.Fatalf("canonical session key = %q, want session-id:session-alias", gotCanonical) + } +} + +func TestCodexExecutorReasoningReplaySessionKeyCanonicalizesWindowHeaderWithPayload(t *testing.T) { + payload := []byte(`{"client_metadata":{"x-codex-window-id":"window-1"}}`) + headers := http.Header{"X-Codex-Window-Id": []string{"window-1"}} + + gotPayload := codexReasoningReplaySessionKeyFromPayload(payload) + gotHeader := codexReasoningReplaySessionKeyFromHeaders(headers) + + if gotPayload != gotHeader { + t.Fatalf("window replay keys differ: payload=%q header=%q", gotPayload, gotHeader) + } + if gotHeader != "window:window-1" { + t.Fatalf("window replay key = %q, want window:window-1", gotHeader) + } +} + +func TestCodexExecutorReasoningReplayCacheSharesSameSessionAcrossCodexAuths(t *testing.T) { + internalcache.ClearCodexReasoningReplayCache() + t.Cleanup(internalcache.ClearCodexReasoningReplayCache) + + encryptedContent := validCodexReasoningEncryptedContentForTestSeed(12) + var bodies [][]byte + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, errRead := io.ReadAll(r.Body) + if errRead != nil { + t.Fatalf("read body: %v", errRead) + } + bodies = append(bodies, body) + + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte(`data: {"type":"response.output_item.done","item":{"id":"rs_done","type":"reasoning","summary":[],"encrypted_content":"` + encryptedContent + `"},"output_index":0}` + "\n")) + _, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[]}}` + "\n\n")) + })) + defer server.Close() + + executor := NewCodexExecutor(&config.Config{}) + firstAuth := &cliproxyauth.Auth{ + ID: "auth-replay-session-auth-a", + Attributes: map[string]string{ + "base_url": server.URL, + "api_key": "test-a", + }, + } + secondAuth := &cliproxyauth.Auth{ + ID: "auth-replay-session-auth-b", + Attributes: map[string]string{ + "base_url": server.URL, + "api_key": "test-b", + }, + } + opts := cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("claude"), + Stream: false, + } + + _, err := executor.Execute(context.Background(), firstAuth, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-auth-switch\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"hello"}]}]}`), + }, opts) + if err != nil { + t.Fatalf("first Execute error: %v", err) + } + + _, err = executor.Execute(context.Background(), secondAuth, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-auth-switch\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]}`), + }, opts) + if err != nil { + t.Fatalf("second Execute error: %v", err) + } + + if len(bodies) != 2 { + t.Fatalf("upstream request count = %d, want 2", len(bodies)) + } + secondBody := bodies[1] + if got := gjson.GetBytes(secondBody, "input.0.type").String(); got != "reasoning" { + t.Fatalf("input.0.type = %q, want same-session replay across auths; body=%s", got, string(secondBody)) + } + if got := gjson.GetBytes(secondBody, "input.0.encrypted_content").String(); got != encryptedContent { + t.Fatalf("injected encrypted_content = %q, want cached value", got) + } +} + +func codexReplaySessionOnlyContext(apiKey string) context.Context { + recorder := httptest.NewRecorder() + ginCtx, _ := gin.CreateTestContext(recorder) + ginCtx.Set("userApiKey", apiKey) + ginCtx.Set("accessProvider", "config-inline") + ginCtx.Request = httptest.NewRequest("POST", "/v1/messages", nil) + return context.WithValue(context.Background(), "gin", ginCtx) +} + +func TestCodexExecutorReasoningReplayCacheDoesNotInjectNativeResponsesRequest(t *testing.T) { + internalcache.ClearCodexReasoningReplayCache() + t.Cleanup(internalcache.ClearCodexReasoningReplayCache) + + cachedEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(3) + internalcache.CacheCodexReasoningReplayItem("gpt-5.4", "prompt-cache:native-session", []byte(`{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+cachedEncryptedContent+`"}`)) + + var gotBody []byte + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, errRead := io.ReadAll(r.Body) + if errRead != nil { + t.Fatalf("read body: %v", errRead) + } + gotBody = body + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[]}}` + "\n\n")) + })) + defer server.Close() + + executor := NewCodexExecutor(&config.Config{}) + _, err := executor.Execute(context.Background(), &cliproxyauth.Auth{ + ID: "auth-replay-native", + Attributes: map[string]string{ + "base_url": server.URL, + "api_key": "test", + }, + }, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4","prompt_cache_key":"native-session","input":[{"role":"user","content":"native"}]}`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("openai-response"), + Stream: false, + }) + if err != nil { + t.Fatalf("Execute error: %v", err) + } + + if got := gjson.GetBytes(gotBody, "input.0.type").String(); got == "reasoning" { + t.Fatalf("native Responses request should not receive cached reasoning; body=%s", string(gotBody)) + } + if got := gjson.GetBytes(gotBody, "input.0.role").String(); got != "user" { + t.Fatalf("input.0.role = %q, want user; body=%s", got, string(gotBody)) + } +} + +func TestCodexExecutorReasoningReplayCacheDoesNotStoreNativeResponsesRequest(t *testing.T) { + internalcache.ClearCodexReasoningReplayCache() + t.Cleanup(internalcache.ClearCodexReasoningReplayCache) + + nativeEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(4) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = io.ReadAll(r.Body) + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[{"id":"rs_native","type":"reasoning","summary":[],"encrypted_content":"` + nativeEncryptedContent + `"}]}}` + "\n\n")) + })) + defer server.Close() + + executor := NewCodexExecutor(&config.Config{}) + _, err := executor.Execute(context.Background(), &cliproxyauth.Auth{ + ID: "auth-replay-native-store", + Attributes: map[string]string{ + "base_url": server.URL, + "api_key": "test", + }, + }, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4","prompt_cache_key":"native-store","input":[{"role":"user","content":"native"}]}`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("openai-response"), + Stream: false, + }) + if err != nil { + t.Fatalf("Execute error: %v", err) + } + + if _, ok := internalcache.GetCodexReasoningReplayItem("gpt-5.4", "prompt-cache:native-store"); ok { + t.Fatal("native Responses request should not populate Codex reasoning replay cache") + } +} + +func TestCodexExecutorReasoningReplayCacheDoesNotDuplicateClaudeClientReasoning(t *testing.T) { + internalcache.ClearCodexReasoningReplayCache() + t.Cleanup(internalcache.ClearCodexReasoningReplayCache) + + cachedEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(5) + clientEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(6) + internalcache.CacheCodexReasoningReplayItem("gpt-5.4", "claude:session-2", []byte(`{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+cachedEncryptedContent+`"}`)) + + var gotBody []byte + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, errRead := io.ReadAll(r.Body) + if errRead != nil { + t.Fatalf("read body: %v", errRead) + } + gotBody = body + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[]}}` + "\n\n")) + })) + defer server.Close() + + executor := NewCodexExecutor(&config.Config{}) + _, err := executor.Execute(context.Background(), &cliproxyauth.Auth{ + ID: "auth-replay-2", + Attributes: map[string]string{ + "base_url": server.URL, + "api_key": "test", + }, + }, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-2\"}"},"messages":[{"role":"assistant","content":[{"type":"thinking","thinking":"client summary","signature":"` + clientEncryptedContent + `"},{"type":"text","text":"answer"}]},{"role":"user","content":[{"type":"text","text":"next"}]}]}`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("claude"), + Stream: false, + }) + if err != nil { + t.Fatalf("Execute error: %v", err) + } + + if got := gjson.GetBytes(gotBody, "input.0.encrypted_content").String(); got != clientEncryptedContent { + t.Fatalf("client reasoning should be preserved, got %q want %q; body=%s", got, clientEncryptedContent, string(gotBody)) + } + reasoningCount := 0 + for _, item := range gjson.GetBytes(gotBody, "input").Array() { + if item.Get("type").String() == "reasoning" { + reasoningCount++ + } + } + if reasoningCount != 1 { + t.Fatalf("reasoning item count = %d, want 1; body=%s", reasoningCount, string(gotBody)) + } +} + +func TestCodexExecutorReasoningReplayCacheInsertsReasoningBeforeAssistantOutputInClaudeHistory(t *testing.T) { + internalcache.ClearCodexReasoningReplayCache() + t.Cleanup(internalcache.ClearCodexReasoningReplayCache) + + cachedEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(7) + internalcache.CacheCodexReasoningReplayItem("gpt-5.4", "claude:session-history", []byte(`{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+cachedEncryptedContent+`"}`)) + + var gotBody []byte + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, errRead := io.ReadAll(r.Body) + if errRead != nil { + t.Fatalf("read body: %v", errRead) + } + gotBody = body + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[]}}` + "\n\n")) + })) + defer server.Close() + + executor := NewCodexExecutor(&config.Config{}) + _, err := executor.Execute(context.Background(), &cliproxyauth.Auth{ + ID: "auth-replay-history", + Attributes: map[string]string{ + "base_url": server.URL, + "api_key": "test", + }, + }, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{ + "model":"gpt-5.4", + "metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-history\"}"}, + "messages":[ + {"role":"user","content":[{"type":"text","text":"first"}]}, + {"role":"assistant","content":[{"type":"text","text":"answer"}]}, + {"role":"user","content":[{"type":"text","text":"next"}]} + ] + }`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("claude"), + Stream: false, + }) + if err != nil { + t.Fatalf("Execute error: %v", err) + } + + if got := gjson.GetBytes(gotBody, "input.0.role").String(); got != "user" { + t.Fatalf("input.0.role = %q, want first user message; body=%s", got, string(gotBody)) + } + if got := gjson.GetBytes(gotBody, "input.1.type").String(); got != "reasoning" { + t.Fatalf("input.1.type = %q, want cached reasoning before assistant output; body=%s", got, string(gotBody)) + } + if got := gjson.GetBytes(gotBody, "input.1.encrypted_content").String(); got != cachedEncryptedContent { + t.Fatalf("input.1.encrypted_content = %q, want cached reasoning; body=%s", got, string(gotBody)) + } + if got := gjson.GetBytes(gotBody, "input.2.role").String(); got != "assistant" { + t.Fatalf("input.2.role = %q, want assistant output after cached reasoning; body=%s", got, string(gotBody)) + } + if got := gjson.GetBytes(gotBody, "input.3.role").String(); got != "user" { + t.Fatalf("input.3.role = %q, want final user message; body=%s", got, string(gotBody)) + } +} + +func TestCodexExecutorReasoningReplayCacheExecuteStreamStoresFinalDoneForClaude(t *testing.T) { + internalcache.ClearCodexReasoningReplayCache() + t.Cleanup(internalcache.ClearCodexReasoningReplayCache) + + addedEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(7) + doneEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(8) + var bodies [][]byte + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, errRead := io.ReadAll(r.Body) + if errRead != nil { + t.Fatalf("read body: %v", errRead) + } + bodies = append(bodies, body) + + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte(`data: {"type":"response.output_item.added","item":{"id":"rs_added","type":"reasoning","status":"in_progress","summary":[],"encrypted_content":"` + addedEncryptedContent + `"},"output_index":0}` + "\n")) + _, _ = w.Write([]byte(`data: {"type":"response.output_item.done","item":{"id":"rs_done","type":"reasoning","summary":[],"encrypted_content":"` + doneEncryptedContent + `"},"output_index":0}` + "\n")) + _, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[]}}` + "\n\n")) + })) + defer server.Close() + + executor := NewCodexExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + ID: "auth-replay-stream", + Attributes: map[string]string{ + "base_url": server.URL, + "api_key": "test", + }, + } + + streamResult, err := executor.ExecuteStream(context.Background(), auth, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"stream-session-1\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"hello"}]}]}`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("claude"), + Stream: true, + }) + if err != nil { + t.Fatalf("ExecuteStream error: %v", err) + } + for chunk := range streamResult.Chunks { + if chunk.Err != nil { + t.Fatalf("stream chunk error: %v", chunk.Err) + } + } + + _, err = executor.Execute(context.Background(), auth, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"stream-session-1\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]}`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("claude"), + Stream: false, + }) + if err != nil { + t.Fatalf("Execute error: %v", err) + } + + if len(bodies) != 2 { + t.Fatalf("upstream request count = %d, want 2", len(bodies)) + } + secondBody := bodies[1] + if got := gjson.GetBytes(secondBody, "input.0.encrypted_content").String(); got != doneEncryptedContent { + t.Fatalf("stream cached encrypted_content = %q, want final done %q; body=%s", got, doneEncryptedContent, string(secondBody)) + } +} + +func TestCodexExecutorReasoningReplayCacheClearsOnNonStreamResponseFailedInvalidSignature(t *testing.T) { + internalcache.ClearCodexReasoningReplayCache() + t.Cleanup(internalcache.ClearCodexReasoningReplayCache) + + cachedEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(9) + internalcache.CacheCodexReasoningReplayItem("gpt-5.4", "claude:session-invalid-nonstream", []byte(`{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+cachedEncryptedContent+`"}`)) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = io.ReadAll(r.Body) + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte(`data: {"type":"response.failed","response":{"id":"resp_1","status":"failed","error":{"message":"Invalid signature in thinking block","type":"invalid_request_error","code":"invalid_request_error"}}}` + "\n\n")) + })) + defer server.Close() + + executor := NewCodexExecutor(&config.Config{}) + _, err := executor.Execute(context.Background(), &cliproxyauth.Auth{ + ID: "auth-replay-invalid-nonstream", + Attributes: map[string]string{ + "base_url": server.URL, + "api_key": "test", + }, + }, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-invalid-nonstream\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]}`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("claude"), + Stream: false, + }) + if err == nil { + t.Fatal("expected invalid signature error") + } + if _, ok := internalcache.GetCodexReasoningReplayItem("gpt-5.4", "claude:session-invalid-nonstream"); ok { + t.Fatal("invalid signature response.failed should clear cached replay item") + } +} + +func TestCodexExecutorReasoningReplayCacheClearsOnStreamResponseFailedInvalidSignature(t *testing.T) { + internalcache.ClearCodexReasoningReplayCache() + t.Cleanup(internalcache.ClearCodexReasoningReplayCache) + + cachedEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(10) + internalcache.CacheCodexReasoningReplayItem("gpt-5.4", "claude:session-invalid-stream", []byte(`{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+cachedEncryptedContent+`"}`)) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = io.ReadAll(r.Body) + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte(`data: {"type":"response.failed","response":{"id":"resp_1","status":"failed","error":{"message":"Invalid signature in thinking block","type":"invalid_request_error","code":"invalid_request_error"}}}` + "\n\n")) + })) + defer server.Close() + + executor := NewCodexExecutor(&config.Config{}) + streamResult, err := executor.ExecuteStream(context.Background(), &cliproxyauth.Auth{ + ID: "auth-replay-invalid-stream", + Attributes: map[string]string{ + "base_url": server.URL, + "api_key": "test", + }, + }, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-invalid-stream\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]}`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("claude"), + Stream: true, + }) + if err != nil { + t.Fatalf("ExecuteStream setup error: %v", err) + } + + gotChunkErr := false + for chunk := range streamResult.Chunks { + if chunk.Err != nil { + gotChunkErr = true + } + } + if !gotChunkErr { + t.Fatal("expected stream chunk error for invalid signature response.failed") + } + if _, ok := internalcache.GetCodexReasoningReplayItem("gpt-5.4", "claude:session-invalid-stream"); ok { + t.Fatal("invalid signature response.failed should clear cached replay item") + } +} + +func TestCodexExecutorReasoningReplayCacheReplaysFunctionCallForClaudeToolResult(t *testing.T) { + internalcache.ClearCodexReasoningReplayCache() + t.Cleanup(internalcache.ClearCodexReasoningReplayCache) + + reasoningEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(8) + var bodies [][]byte + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, errRead := io.ReadAll(r.Body) + if errRead != nil { + t.Fatalf("read body: %v", errRead) + } + bodies = append(bodies, body) + + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte(`data: {"type":"response.output_item.done","item":{"id":"rs_1","type":"reasoning","summary":[],"encrypted_content":"` + reasoningEncryptedContent + `"},"output_index":0}` + "\n")) + _, _ = w.Write([]byte(`data: {"type":"response.output_item.added","item":{"id":"fc_1","type":"function_call","call_id":"call_1","name":"lookup","arguments":"{\"q\":\"weather\"}","status":"in_progress"},"output_index":1}` + "\n")) + _, _ = w.Write([]byte(`data: {"type":"response.output_item.done","item":{"id":"fc_1","type":"function_call","call_id":"call_1","name":"lookup","arguments":"{\"q\":\"weather\"}","status":"completed"},"output_index":1}` + "\n")) + _, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[]}}` + "\n\n")) + })) + defer server.Close() + + executor := NewCodexExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + ID: "auth-replay-claude-tool", + Attributes: map[string]string{ + "base_url": server.URL, + "api_key": "test", + }, + } + opts := cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("claude"), + Stream: false, + } + + _, err := executor.Execute(context.Background(), auth, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{ + "model":"gpt-5.4", + "metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"claude-session-tool\"}"}, + "messages":[{"role":"user","content":[{"type":"text","text":"call lookup"}]}], + "tools":[{"name":"lookup","input_schema":{"type":"object","properties":{"q":{"type":"string"}}}}] + }`), + }, opts) + if err != nil { + t.Fatalf("first Execute error: %v", err) + } + + _, err = executor.Execute(context.Background(), auth, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{ + "model":"gpt-5.4", + "metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"claude-session-tool\"}"}, + "messages":[ + {"role":"user","content":[{"type":"text","text":"call lookup"}]}, + {"role":"user","content":[{"type":"tool_result","tool_use_id":"call_1","content":"sunny"}]} + ], + "tools":[{"name":"lookup","input_schema":{"type":"object","properties":{"q":{"type":"string"}}}}] + }`), + }, opts) + if err != nil { + t.Fatalf("second Execute error: %v", err) + } + + if len(bodies) != 2 { + t.Fatalf("upstream request count = %d, want 2", len(bodies)) + } + secondBody := bodies[1] + if got := gjson.GetBytes(secondBody, "input.0.type").String(); got != "message" { + t.Fatalf("input.0.type = %q, want initial user message; body=%s", got, string(secondBody)) + } + if got := gjson.GetBytes(secondBody, "input.1.type").String(); got != "reasoning" { + t.Fatalf("input.1.type = %q, want cached reasoning; body=%s", got, string(secondBody)) + } + if got := gjson.GetBytes(secondBody, "input.2.type").String(); got != "function_call" { + t.Fatalf("input.2.type = %q, want cached function_call; body=%s", got, string(secondBody)) + } + if got := gjson.GetBytes(secondBody, "input.2.call_id").String(); got != "call_1" { + t.Fatalf("input.2.call_id = %q, want call_1; body=%s", got, string(secondBody)) + } + if got := gjson.GetBytes(secondBody, "input.3.type").String(); got != "function_call_output" { + t.Fatalf("input.3.type = %q, want function_call_output after cached call; body=%s", got, string(secondBody)) + } + if got := gjson.GetBytes(secondBody, "input.3.call_id").String(); got != "call_1" { + t.Fatalf("input.3.call_id = %q, want call_1; body=%s", got, string(secondBody)) + } +} + +func TestCodexExecutorReasoningReplayCacheMatchesShortenedClaudeToolResultCallID(t *testing.T) { + internalcache.ClearCodexReasoningReplayCache() + t.Cleanup(internalcache.ClearCodexReasoningReplayCache) + + longCallID := "call_" + strings.Repeat("a", 62) + shortCallID := shortenedCodexReplayCallIDForTest(longCallID) + if len(longCallID) <= 64 || len(shortCallID) > 64 || shortCallID == longCallID { + t.Fatalf("invalid test setup: long=%q short=%q", longCallID, shortCallID) + } + + reasoningEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(13) + var bodies [][]byte + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, errRead := io.ReadAll(r.Body) + if errRead != nil { + t.Fatalf("read body: %v", errRead) + } + bodies = append(bodies, body) + + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte(`data: {"type":"response.output_item.done","item":{"id":"rs_long","type":"reasoning","summary":[],"encrypted_content":"` + reasoningEncryptedContent + `"},"output_index":0}` + "\n")) + _, _ = w.Write([]byte(`data: {"type":"response.output_item.done","item":{"id":"fc_long","type":"function_call","call_id":"` + longCallID + `","name":"lookup","arguments":"{\"q\":\"weather\"}","status":"completed"},"output_index":1}` + "\n")) + _, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[]}}` + "\n\n")) + })) + defer server.Close() + + executor := NewCodexExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + ID: "auth-replay-claude-short-tool", + Attributes: map[string]string{ + "base_url": server.URL, + "api_key": "test", + }, + } + opts := cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("claude"), + Stream: false, + } + + _, err := executor.Execute(context.Background(), auth, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{ + "model":"gpt-5.4", + "metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"claude-session-short-tool\"}"}, + "messages":[{"role":"user","content":[{"type":"text","text":"call lookup"}]}], + "tools":[{"name":"lookup","input_schema":{"type":"object","properties":{"q":{"type":"string"}}}}] + }`), + }, opts) + if err != nil { + t.Fatalf("first Execute error: %v", err) + } + + _, err = executor.Execute(context.Background(), auth, cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{ + "model":"gpt-5.4", + "metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"claude-session-short-tool\"}"}, + "messages":[ + {"role":"user","content":[{"type":"text","text":"call lookup"}]}, + {"role":"user","content":[{"type":"tool_result","tool_use_id":"` + shortCallID + `","content":"sunny"}]} + ], + "tools":[{"name":"lookup","input_schema":{"type":"object","properties":{"q":{"type":"string"}}}}] + }`), + }, opts) + if err != nil { + t.Fatalf("second Execute error: %v", err) + } + + if len(bodies) != 2 { + t.Fatalf("upstream request count = %d, want 2", len(bodies)) + } + secondBody := bodies[1] + if got := gjson.GetBytes(secondBody, "input.0.type").String(); got != "message" { + t.Fatalf("input.0.type = %q, want initial user message; body=%s", got, string(secondBody)) + } + if got := gjson.GetBytes(secondBody, "input.1.type").String(); got != "reasoning" { + t.Fatalf("input.1.type = %q, want cached reasoning; body=%s", got, string(secondBody)) + } + if got := gjson.GetBytes(secondBody, "input.2.type").String(); got != "function_call" { + t.Fatalf("input.2.type = %q, want cached function_call; body=%s", got, string(secondBody)) + } + if got := gjson.GetBytes(secondBody, "input.2.call_id").String(); got != shortCallID { + t.Fatalf("input.2.call_id = %q, want shortened call_id %q; body=%s", got, shortCallID, string(secondBody)) + } + if got := gjson.GetBytes(secondBody, "input.3.type").String(); got != "function_call_output" { + t.Fatalf("input.3.type = %q, want function_call_output after cached call; body=%s", got, string(secondBody)) + } + if got := gjson.GetBytes(secondBody, "input.3.call_id").String(); got != shortCallID { + t.Fatalf("input.3.call_id = %q, want shortened call_id %q; body=%s", got, shortCallID, string(secondBody)) + } +} diff --git a/internal/runtime/executor/codex_executor_stream_output_test.go b/internal/runtime/executor/codex_executor_stream_output_test.go index 983f915bc..46a227924 100644 --- a/internal/runtime/executor/codex_executor_stream_output_test.go +++ b/internal/runtime/executor/codex_executor_stream_output_test.go @@ -159,6 +159,13 @@ func TestCodexTerminalStreamContextLengthErrIgnoresOtherTerminalErrors(t *testin } } +func TestCodexTerminalStreamErrIgnoresRateLimitTerminalErrors(t *testing.T) { + _, _, ok := codexTerminalStreamErr([]byte(`{"type":"error","error":{"type":"rate_limit_error","code":"rate_limit_exceeded","message":"Rate limit reached."}}`)) + if ok { + t.Fatal("rate limit terminal error should not be handled by replay terminal error path") + } +} + func statusCodeFromTestError(t *testing.T, err error) int { t.Helper() diff --git a/internal/runtime/executor/codex_websockets_executor.go b/internal/runtime/executor/codex_websockets_executor.go index e1c9ce344..8d68a251e 100644 --- a/internal/runtime/executor/codex_websockets_executor.go +++ b/internal/runtime/executor/codex_websockets_executor.go @@ -835,21 +835,11 @@ func applyCodexPromptCacheHeaders(from sdktranslator.Format, req cliproxyexecuto } var cache helps.CodexCache - if from == "claude" { - userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id") - if userIDResult.Exists() { - key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String()) - if cached, ok := helps.GetCodexCache(key); ok { - cache = cached - } else { - cache = helps.CodexCache{ - ID: uuid.New().String(), - Expire: time.Now().Add(1 * time.Hour), - } - helps.SetCodexCache(key, cache) - } + if sourceFormatEqual(from, sdktranslator.FormatClaude) { + if cached, ok := codexClaudeCodePromptCache(req); ok { + cache = cached } - } else if from == "openai-response" { + } else if sourceFormatEqual(from, sdktranslator.FormatOpenAIResponse) { if promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key"); promptCacheKey.Exists() { cache.ID = promptCacheKey.String() } @@ -899,10 +889,11 @@ func applyCodexWebsocketHeaders(ctx context.Context, headers http.Header, auth * betaHeader = codexResponsesWebsocketBetaHeaderValue } headers.Set("OpenAI-Beta", betaHeader) + sessionFallback := "" if strings.Contains(headers.Get("User-Agent"), "Mac OS") { - ensureHeaderCasePreserved(headers, ginHeaders, "session_id", "", uuid.NewString()) + sessionFallback = uuid.NewString() } - ensureHeaderCasePreserved(headers, ginHeaders, "session_id", "", "") + ensureCodexWebsocketSessionHeader(headers, ginHeaders, sessionFallback) if originator := strings.TrimSpace(ginHeaders.Get("Originator")); originator != "" { headers.Set("Originator", originator) } else if !isAPIKey { @@ -927,6 +918,32 @@ func applyCodexWebsocketHeaders(ctx context.Context, headers http.Header, auth * return headers } +func ensureCodexWebsocketSessionHeader(target http.Header, source http.Header, fallbackValue string) { + if target == nil { + return + } + sessionID := codexSessionHeaderValue(target) + if sessionID == "" { + sessionID = codexSessionHeaderValue(source) + } + if sessionID == "" { + sessionID = strings.TrimSpace(fallbackValue) + } + if sessionID != "" { + setHeaderCasePreserved(target, "session_id", sessionID) + } + deleteHeaderCaseInsensitive(target, "Session-Id") +} + +func codexSessionHeaderValue(headers http.Header) string { + for _, key := range []string{"Session-Id", "Session_id", "session_id"} { + if value := strings.TrimSpace(headerValueCaseInsensitive(headers, key)); value != "" { + return value + } + } + return "" +} + func codexAuthUsesAPIKey(auth *cliproxyauth.Auth) bool { if auth == nil || auth.Attributes == nil { return false @@ -969,6 +986,47 @@ func setHeaderCasePreserved(headers http.Header, key string, value string) { headers[key] = []string{value} } +func setCodexSessionHeaderCasePreserved(headers http.Header, fallbackKey string, value string) { + if headers == nil { + return + } + fallbackKey = strings.TrimSpace(fallbackKey) + value = strings.TrimSpace(value) + if fallbackKey == "" || value == "" { + return + } + + selectedKey := "" + if _, ok := headers[fallbackKey]; ok && codexSessionHeaderKeyUsesUnderscore(fallbackKey) { + selectedKey = fallbackKey + } else { + for existingKey := range headers { + if codexSessionHeaderKeyUsesUnderscore(existingKey) { + selectedKey = existingKey + break + } + } + } + if selectedKey == "" { + selectedKey = fallbackKey + } + for existingKey := range headers { + if codexSessionHeaderKey(existingKey) && existingKey != selectedKey { + delete(headers, existingKey) + } + } + headers[selectedKey] = []string{value} +} + +func codexSessionHeaderKey(key string) bool { + normalized := strings.ToLower(strings.TrimSpace(key)) + return normalized == "session_id" || normalized == "session-id" +} + +func codexSessionHeaderKeyUsesUnderscore(key string) bool { + return strings.ToLower(strings.TrimSpace(key)) == "session_id" +} + func headerValueCaseInsensitive(headers http.Header, key string) string { key = strings.TrimSpace(key) if headers == nil || key == "" { diff --git a/internal/runtime/executor/codex_websockets_executor_test.go b/internal/runtime/executor/codex_websockets_executor_test.go index 5dbfbce94..a3d3a5525 100644 --- a/internal/runtime/executor/codex_websockets_executor_test.go +++ b/internal/runtime/executor/codex_websockets_executor_test.go @@ -197,7 +197,7 @@ func TestApplyCodexWebsocketHeadersPassesThroughClientIdentityHeaders(t *testing "Version": "0.115.0-alpha.27", "X-Codex-Turn-Metadata": `{"turn_id":"turn-1"}`, "X-Client-Request-Id": "019d2233-e240-7162-992d-38df0a2a0e0d", - "session_id": "legacy-session", + "session-id": "legacy-session", }) headers := applyCodexWebsocketHeaders(ctx, http.Header{}, auth, "", nil) @@ -217,11 +217,32 @@ func TestApplyCodexWebsocketHeadersPassesThroughClientIdentityHeaders(t *testing if got := headers.Get("X-Client-Request-Id"); got != "019d2233-e240-7162-992d-38df0a2a0e0d" { t.Fatalf("X-Client-Request-Id = %s, want %s", got, "019d2233-e240-7162-992d-38df0a2a0e0d") } - if got := headerValueCaseInsensitive(headers, "session_id"); got != "legacy-session" { - t.Fatalf("session_id = %s, want legacy-session", got) + if got := headers["session_id"]; len(got) != 1 || got[0] != "legacy-session" { + t.Fatalf("session_id = %#v, want [legacy-session]", got) } - if _, ok := headers["session_id"]; !ok { - t.Fatalf("expected lowercase session_id header key, got %#v", headers) + if got := headers.Get("Session-Id"); got != "" { + t.Fatalf("Session-Id = %s, want empty", got) + } +} + +func TestApplyCodexWebsocketHeadersCanonicalizesLegacyUnderscoreSessionHeader(t *testing.T) { + auth := &cliproxyauth.Auth{ + Provider: "codex", + Metadata: map[string]any{"email": "user@example.com"}, + } + ctx := contextWithGinHeaders(map[string]string{ + "Originator": "Codex Desktop", + "User-Agent": "codex_cli_rs/0.1.0", + "Session_id": "legacy-underscore-session", + }) + + headers := applyCodexWebsocketHeaders(ctx, http.Header{}, auth, "", nil) + + if got := headers["session_id"]; len(got) != 1 || got[0] != "legacy-underscore-session" { + t.Fatalf("session_id = %#v, want [legacy-underscore-session]", got) + } + if got := headers.Get("Session-Id"); got != "" { + t.Fatalf("Session-Id = %s, want empty", got) } } @@ -361,22 +382,79 @@ func TestApplyCodexWebsocketHeadersUsesCanonicalAccountHeader(t *testing.T) { } } -func TestApplyCodexPromptCacheHeadersSetsLowercaseSessionAndLegacyConversation(t *testing.T) { +func TestApplyCodexPromptCacheHeadersSetsSessionIDAndLegacyConversation(t *testing.T) { req := cliproxyexecutor.Request{Model: "gpt-5-codex", Payload: []byte(`{"prompt_cache_key":"cache-1"}`)} _, headers := applyCodexPromptCacheHeaders("openai-response", req, []byte(`{"model":"gpt-5-codex"}`)) - if got := headerValueCaseInsensitive(headers, "session_id"); got != "cache-1" { - t.Fatalf("session_id = %s, want cache-1", got) + if got := headers["session_id"]; len(got) != 1 || got[0] != "cache-1" { + t.Fatalf("session_id = %#v, want [cache-1]", got) } - if _, ok := headers["session_id"]; !ok { - t.Fatalf("expected lowercase session_id key, got %#v", headers) + if got := headers.Get("Session-Id"); got != "" { + t.Fatalf("Session-Id = %s, want empty", got) } if got := headers.Get("Conversation_id"); got != "cache-1" { t.Fatalf("Conversation_id = %s, want cache-1", got) } } +func TestApplyCodexPromptCacheHeadersClaudeUsesClaudeCodeSessionID(t *testing.T) { + firstReq := cliproxyexecutor.Request{ + Model: "gpt-5-codex-claude-ws-cache-session", + Payload: []byte(`{ + "metadata":{"user_id":"{\"device_id\":\"device-a\",\"account_uuid\":\"\",\"session_id\":\"ws-cache-session-1\"}"}, + "messages":[{"role":"user","content":[{"type":"text","text":"first"}]}] + }`), + } + secondReq := cliproxyexecutor.Request{ + Model: "gpt-5-codex-claude-ws-cache-session", + Payload: []byte(`{ + "metadata":{"user_id":"{\"device_id\":\"device-b\",\"account_uuid\":\"\",\"session_id\":\"ws-cache-session-1\"}"}, + "messages":[{"role":"user","content":[{"type":"text","text":"next"}]}] + }`), + } + + firstBody, firstHeaders := applyCodexPromptCacheHeaders("claude", firstReq, []byte(`{"model":"gpt-5-codex"}`)) + secondBody, secondHeaders := applyCodexPromptCacheHeaders("claude", secondReq, []byte(`{"model":"gpt-5-codex"}`)) + + firstKey := gjson.GetBytes(firstBody, "prompt_cache_key").String() + secondKey := gjson.GetBytes(secondBody, "prompt_cache_key").String() + if firstKey == "" { + t.Fatalf("first prompt_cache_key is empty; body=%s", string(firstBody)) + } + if secondKey != firstKey { + t.Fatalf("same Claude Code session_id produced different websocket prompt_cache_key: first=%q second=%q", firstKey, secondKey) + } + if got := firstHeaders["session_id"]; len(got) != 1 || got[0] != firstKey { + t.Fatalf("first session_id = %#v, want [%q]", got, firstKey) + } + if got := secondHeaders["session_id"]; len(got) != 1 || got[0] != firstKey { + t.Fatalf("second session_id = %#v, want [%q]", got, firstKey) + } +} + +func TestApplyCodexPromptCacheHeadersClaudeRejectsBareUserID(t *testing.T) { + req := cliproxyexecutor.Request{ + Model: "gpt-5-codex-claude-ws-cache-bare-user", + Payload: []byte(`{"metadata":{"user_id":"same-user-across-chats"},"messages":[{"role":"user","content":[{"type":"text","text":"first"}]}]}`), + } + + body, headers := applyCodexPromptCacheHeaders("claude", req, []byte(`{"model":"gpt-5-codex"}`)) + + if got := gjson.GetBytes(body, "prompt_cache_key").String(); got != "" { + t.Fatalf("bare metadata.user_id must not create websocket prompt_cache_key, got %q; body=%s", got, string(body)) + } + if got := headers["session_id"]; len(got) != 0 { + t.Fatalf("bare metadata.user_id must not create websocket session_id, got %#v", got) + } + if got := headers.Get("Session-Id"); got != "" { + t.Fatalf("bare metadata.user_id must not create websocket Session-Id, got %q", got) + } + if got := headers.Get("Conversation_id"); got != "" { + t.Fatalf("bare metadata.user_id must not create websocket Conversation_id, got %q", got) + } +} + func TestApplyCodexWebsocketHeadersIdentityConfuseRemapsPromptCacheKey(t *testing.T) { cfg := &config.Config{ Routing: config.RoutingConfig{SessionAffinity: true}, @@ -402,8 +480,11 @@ func TestApplyCodexWebsocketHeadersIdentityConfuseRemapsPromptCacheKey(t *testin if gotKey := gjson.GetBytes(body, "prompt_cache_key").String(); gotKey != expectedPromptCacheKey { t.Fatalf("prompt_cache_key = %q, want %q", gotKey, expectedPromptCacheKey) } - if gotSession := headerValueCaseInsensitive(headers, "session_id"); gotSession != expectedPromptCacheKey { - t.Fatalf("session_id = %q, want %q", gotSession, expectedPromptCacheKey) + if gotSession := headers["session_id"]; len(gotSession) != 1 || gotSession[0] != expectedPromptCacheKey { + t.Fatalf("session_id = %#v, want [%q]", gotSession, expectedPromptCacheKey) + } + if gotCanonicalSession := headers.Get("Session-Id"); gotCanonicalSession != "" { + t.Fatalf("Session-Id = %q, want empty", gotCanonicalSession) } if gotRequestID := headers.Get("X-Client-Request-Id"); gotRequestID != expectedPromptCacheKey { t.Fatalf("X-Client-Request-Id = %q, want %q", gotRequestID, expectedPromptCacheKey)