Merge pull request #3667 from sususu98/feat/codex-reasoning-replay-cache-upstream-dev

feat(codex): cache reasoning replay items
This commit is contained in:
Luis Pater
2026-06-02 19:28:55 +08:00
committed by GitHub
9 changed files with 1916 additions and 58 deletions

View File

@@ -0,0 +1,253 @@
package cache
import (
"sort"
"strings"
"sync"
"time"
"github.com/router-for-me/CLIProxyAPI/v7/internal/signature"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)
const (
// CodexReasoningReplayCacheTTL limits how long encrypted reasoning replay
// items stay in process memory.
CodexReasoningReplayCacheTTL = 1 * time.Hour
// CodexReasoningReplayCacheMaxEntries bounds process memory for replay
// continuity. Oldest entries are evicted first.
CodexReasoningReplayCacheMaxEntries = 10240
// CodexReasoningReplayCacheEvictBatchSize leaves headroom after the cache
// reaches capacity so high write volume does not rescan the map every turn.
CodexReasoningReplayCacheEvictBatchSize = 128
)
type codexReasoningReplayEntry struct {
Items [][]byte
Timestamp time.Time
}
var (
codexReasoningReplayMu sync.Mutex
codexReasoningReplayEntries = make(map[string]codexReasoningReplayEntry)
)
// CacheCodexReasoningReplayItem stores a final GPT/Codex reasoning item for
// stateless replay. The stored item is normalized to the minimal shape accepted
// by Responses input replay.
func CacheCodexReasoningReplayItem(modelName, sessionKey string, item []byte) bool {
return CacheCodexReasoningReplayItems(modelName, sessionKey, [][]byte{item})
}
// CacheCodexReasoningReplayItems stores the final GPT/Codex assistant output
// items needed to replay a stateless next turn.
func CacheCodexReasoningReplayItems(modelName, sessionKey string, items [][]byte) bool {
key := codexReasoningReplayCacheKey(modelName, sessionKey)
if key == "" {
return false
}
normalized, ok := normalizeCodexReasoningReplayItems(items)
if !ok {
return false
}
cacheCleanupOnce.Do(startCacheCleanup)
now := time.Now()
codexReasoningReplayMu.Lock()
defer codexReasoningReplayMu.Unlock()
codexReasoningReplayEntries[key] = codexReasoningReplayEntry{
Items: normalized,
Timestamp: now,
}
if len(codexReasoningReplayEntries) > CodexReasoningReplayCacheMaxEntries {
evictOldestCodexReasoningReplayEntries(CodexReasoningReplayCacheEvictBatchSize)
}
return true
}
// GetCodexReasoningReplayItem retrieves a normalized reasoning replay item.
func GetCodexReasoningReplayItem(modelName, sessionKey string) ([]byte, bool) {
items, ok := GetCodexReasoningReplayItems(modelName, sessionKey)
if !ok || len(items) == 0 {
return nil, false
}
return items[0], true
}
// GetCodexReasoningReplayItems retrieves normalized assistant output items.
func GetCodexReasoningReplayItems(modelName, sessionKey string) ([][]byte, bool) {
key := codexReasoningReplayCacheKey(modelName, sessionKey)
if key == "" {
return nil, false
}
cacheCleanupOnce.Do(startCacheCleanup)
now := time.Now()
codexReasoningReplayMu.Lock()
defer codexReasoningReplayMu.Unlock()
entry, ok := codexReasoningReplayEntries[key]
if !ok {
return nil, false
}
if now.Sub(entry.Timestamp) > CodexReasoningReplayCacheTTL {
delete(codexReasoningReplayEntries, key)
return nil, false
}
entry.Timestamp = now
codexReasoningReplayEntries[key] = entry
return cloneCodexReasoningReplayItems(entry.Items), true
}
// DeleteCodexReasoningReplayItem removes one replay item after upstream rejects
// it or the caller otherwise knows it is stale.
func DeleteCodexReasoningReplayItem(modelName, sessionKey string) {
key := codexReasoningReplayCacheKey(modelName, sessionKey)
if key == "" {
return
}
codexReasoningReplayMu.Lock()
delete(codexReasoningReplayEntries, key)
codexReasoningReplayMu.Unlock()
}
// ClearCodexReasoningReplayCache clears all Codex reasoning replay state.
func ClearCodexReasoningReplayCache() {
codexReasoningReplayMu.Lock()
codexReasoningReplayEntries = make(map[string]codexReasoningReplayEntry)
codexReasoningReplayMu.Unlock()
}
func codexReasoningReplayCacheKey(modelName, sessionKey string) string {
modelName = strings.TrimSpace(modelName)
sessionKey = strings.TrimSpace(sessionKey)
if modelName == "" || sessionKey == "" {
return ""
}
// The session key is the continuity boundary. Keep this independent from
// the selected upstream Codex credential so auth failover can preserve replay.
return strings.Join([]string{"codex-reasoning-replay", modelName, sessionKey}, "\x00")
}
func normalizeCodexReasoningReplayItems(items [][]byte) ([][]byte, bool) {
normalized := make([][]byte, 0, len(items))
for _, item := range items {
normalizedItem, ok := normalizeCodexReasoningReplayItem(item)
if ok {
normalized = append(normalized, normalizedItem)
}
}
return normalized, len(normalized) > 0
}
func normalizeCodexReasoningReplayItem(item []byte) ([]byte, bool) {
itemResult := gjson.ParseBytes(item)
switch strings.TrimSpace(itemResult.Get("type").String()) {
case "reasoning":
return normalizeCodexReasoningReplayReasoningItem(itemResult)
case "function_call":
return normalizeCodexReasoningReplayFunctionCallItem(itemResult)
case "custom_tool_call":
return normalizeCodexReasoningReplayCustomToolCallItem(itemResult)
default:
return nil, false
}
}
func normalizeCodexReasoningReplayReasoningItem(itemResult gjson.Result) ([]byte, bool) {
encryptedContentResult := itemResult.Get("encrypted_content")
if encryptedContentResult.Type != gjson.String {
return nil, false
}
encryptedContent := encryptedContentResult.String()
if encryptedContent != strings.TrimSpace(encryptedContent) {
return nil, false
}
if _, err := signature.InspectGPTReasoningSignature(encryptedContent); err != nil {
return nil, false
}
normalized := []byte(`{"type":"reasoning","summary":[],"content":null}`)
normalized, _ = sjson.SetBytes(normalized, "encrypted_content", encryptedContent)
return normalized, true
}
func normalizeCodexReasoningReplayFunctionCallItem(itemResult gjson.Result) ([]byte, bool) {
callID := strings.TrimSpace(itemResult.Get("call_id").String())
name := strings.TrimSpace(itemResult.Get("name").String())
arguments := itemResult.Get("arguments")
if callID == "" || name == "" || arguments.Type != gjson.String {
return nil, false
}
normalized := []byte(`{"type":"function_call"}`)
normalized, _ = sjson.SetBytes(normalized, "call_id", callID)
normalized, _ = sjson.SetBytes(normalized, "name", name)
normalized, _ = sjson.SetBytes(normalized, "arguments", arguments.String())
return normalized, true
}
func normalizeCodexReasoningReplayCustomToolCallItem(itemResult gjson.Result) ([]byte, bool) {
callID := strings.TrimSpace(itemResult.Get("call_id").String())
name := strings.TrimSpace(itemResult.Get("name").String())
input := itemResult.Get("input")
if callID == "" || name == "" || !input.Exists() {
return nil, false
}
normalized := []byte(`{"type":"custom_tool_call","status":"completed"}`)
if status := strings.TrimSpace(itemResult.Get("status").String()); status != "" {
normalized, _ = sjson.SetBytes(normalized, "status", status)
}
normalized, _ = sjson.SetBytes(normalized, "call_id", callID)
normalized, _ = sjson.SetBytes(normalized, "name", name)
if input.Type == gjson.String {
normalized, _ = sjson.SetBytes(normalized, "input", input.String())
} else {
normalized, _ = sjson.SetRawBytes(normalized, "input", []byte(input.Raw))
}
return normalized, true
}
func cloneCodexReasoningReplayItems(items [][]byte) [][]byte {
cloned := make([][]byte, 0, len(items))
for _, item := range items {
cloned = append(cloned, append([]byte(nil), item...))
}
return cloned
}
func evictOldestCodexReasoningReplayEntries(count int) {
if count <= 0 || len(codexReasoningReplayEntries) == 0 {
return
}
type candidate struct {
key string
timestamp time.Time
}
candidates := make([]candidate, 0, len(codexReasoningReplayEntries))
for key, entry := range codexReasoningReplayEntries {
candidates = append(candidates, candidate{key: key, timestamp: entry.Timestamp})
}
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].timestamp.Before(candidates[j].timestamp)
})
if count > len(candidates) {
count = len(candidates)
}
for i := 0; i < count; i++ {
delete(codexReasoningReplayEntries, candidates[i].key)
}
}
func purgeExpiredCodexReasoningReplayCache(now time.Time) {
codexReasoningReplayMu.Lock()
for key, entry := range codexReasoningReplayEntries {
if now.Sub(entry.Timestamp) > CodexReasoningReplayCacheTTL {
delete(codexReasoningReplayEntries, key)
}
}
codexReasoningReplayMu.Unlock()
}

View File

@@ -0,0 +1,73 @@
package cache
import (
"encoding/base64"
"fmt"
"testing"
)
func validCodexReasoningReplayEncryptedContentForTest(seed byte) string {
payload := make([]byte, 1+8+16+16+32)
payload[0] = 0x80
for i := 9; i < len(payload); i++ {
payload[i] = seed + byte(i)
}
return base64.RawURLEncoding.EncodeToString(payload)
}
func TestCodexReasoningReplayCacheRejectsInvalidItems(t *testing.T) {
ClearCodexReasoningReplayCache()
t.Cleanup(ClearCodexReasoningReplayCache)
if CacheCodexReasoningReplayItem("gpt-5.4", "session", []byte(`{"type":"reasoning","encrypted_content":"bad","summary":[]}`)) {
t.Fatal("invalid encrypted_content should not be cached")
}
if _, ok := GetCodexReasoningReplayItem("gpt-5.4", "session"); ok {
t.Fatal("invalid item was cached")
}
}
func TestCodexReasoningReplayCacheScopesByModelAndSession(t *testing.T) {
ClearCodexReasoningReplayCache()
t.Cleanup(ClearCodexReasoningReplayCache)
encryptedContent := validCodexReasoningReplayEncryptedContentForTest(7)
if !CacheCodexReasoningReplayItem("gpt-5.4", "session-a", []byte(`{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+encryptedContent+`"}`)) {
t.Fatal("valid item was not cached")
}
if _, ok := GetCodexReasoningReplayItem("gpt-5.5", "session-a"); ok {
t.Fatal("cache should not hit across models")
}
if _, ok := GetCodexReasoningReplayItem("gpt-5.4", "session-b"); ok {
t.Fatal("cache should not hit across sessions")
}
item, ok := GetCodexReasoningReplayItem("gpt-5.4", "session-a")
if !ok {
t.Fatal("cache miss for original model and session")
}
if string(item) != `{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+encryptedContent+`"}` {
t.Fatalf("normalized item = %s", string(item))
}
}
func TestCodexReasoningReplayCacheBatchEvictsWhenFull(t *testing.T) {
ClearCodexReasoningReplayCache()
t.Cleanup(ClearCodexReasoningReplayCache)
encryptedContent := validCodexReasoningReplayEncryptedContentForTest(9)
item := []byte(`{"type":"reasoning","summary":[],"content":null,"encrypted_content":"` + encryptedContent + `"}`)
for i := 0; i <= CodexReasoningReplayCacheMaxEntries; i++ {
if !CacheCodexReasoningReplayItem("gpt-5.4", fmt.Sprintf("session-%d", i), item) {
t.Fatalf("cache insert %d failed", i)
}
}
codexReasoningReplayMu.Lock()
gotLen := len(codexReasoningReplayEntries)
codexReasoningReplayMu.Unlock()
if gotLen >= CodexReasoningReplayCacheMaxEntries {
t.Fatalf("cache entries = %d, want batch eviction below max %d", gotLen, CodexReasoningReplayCacheMaxEntries)
}
}

View File

@@ -94,6 +94,7 @@ func purgeExpiredCaches() {
}
return true
})
purgeExpiredCodexReasoningReplayCache(now)
}
// CacheSignature stores a thinking signature for a given model group and text.

View File

@@ -4,17 +4,22 @@ import (
"bufio"
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"net/http"
"regexp"
"sort"
"strings"
"time"
codexauth "github.com/router-for-me/CLIProxyAPI/v7/internal/auth/codex"
internalcache "github.com/router-for-me/CLIProxyAPI/v7/internal/cache"
"github.com/router-for-me/CLIProxyAPI/v7/internal/config"
"github.com/router-for-me/CLIProxyAPI/v7/internal/misc"
"github.com/router-for-me/CLIProxyAPI/v7/internal/runtime/executor/helps"
"github.com/router-for-me/CLIProxyAPI/v7/internal/signature"
"github.com/router-for-me/CLIProxyAPI/v7/internal/thinking"
"github.com/router-for-me/CLIProxyAPI/v7/internal/util"
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth"
@@ -36,6 +41,7 @@ const (
)
var dataTag = []byte("data:")
var codexClaudeCodeSessionPattern = regexp.MustCompile(`_session_([a-f0-9-]+)$`)
// Streamed Codex responses may emit response.output_item.done events while leaving
// response.completed.response.output empty. Keep the stream path aligned with the
@@ -101,6 +107,14 @@ func patchCodexCompletedOutput(eventData []byte, outputItemsByIndex map[int64][]
}
func codexTerminalStreamContextLengthErr(eventData []byte) (statusErr, bool) {
streamErr, body, ok := codexTerminalStreamErr(eventData)
if !ok || !codexTerminalErrorIsContextLength(body) {
return statusErr{}, false
}
return streamErr, true
}
func codexTerminalStreamErr(eventData []byte) (statusErr, []byte, bool) {
eventType := gjson.GetBytes(eventData, "type").String()
var body []byte
switch eventType {
@@ -115,15 +129,23 @@ func codexTerminalStreamContextLengthErr(eventData []byte) (statusErr, bool) {
body = codexTerminalErrorBody(eventData, "error")
}
default:
return statusErr{}, false
return statusErr{}, nil, false
}
if len(body) == 0 {
return statusErr{}, false
return statusErr{}, nil, false
}
if !codexTerminalErrorIsContextLength(body) {
return statusErr{}, false
if !codexTerminalStreamErrShouldHandle(body) {
return statusErr{}, nil, false
}
return newCodexStatusErr(http.StatusBadRequest, body), true
return newCodexStatusErr(http.StatusBadRequest, body), body, true
}
func codexTerminalStreamErrShouldHandle(body []byte) bool {
if codexTerminalErrorIsContextLength(body) {
return true
}
code, _, ok := codexStatusErrorClassification(http.StatusBadRequest, body)
return ok && code == "thinking_signature_invalid"
}
func codexTerminalErrorBody(eventData []byte, path string) []byte {
@@ -217,6 +239,482 @@ func translateCodexRequestPair(from, to sdktranslator.Format, model string, orig
return originalTranslated, body
}
type codexReasoningReplayScope struct {
modelName string
sessionKey string
}
func (s codexReasoningReplayScope) valid() bool {
return strings.TrimSpace(s.modelName) != "" && strings.TrimSpace(s.sessionKey) != ""
}
func applyCodexReasoningReplayCache(ctx context.Context, from sdktranslator.Format, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, body []byte) ([]byte, codexReasoningReplayScope) {
scope := codexReasoningReplayScopeFromRequest(ctx, from, req, opts, body)
if !scope.valid() {
return body, scope
}
items, ok := internalcache.GetCodexReasoningReplayItems(scope.modelName, scope.sessionKey)
if !ok {
return body, scope
}
items = filterCodexReasoningReplayItemsForInput(body, items)
if len(items) == 0 {
return body, scope
}
updated, ok := insertCodexReasoningReplayItems(body, items)
if !ok {
return body, scope
}
return updated, scope
}
func codexReasoningReplayScopeFromRequest(ctx context.Context, from sdktranslator.Format, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, body []byte) codexReasoningReplayScope {
if !codexReasoningReplayEnabledForSource(from) {
return codexReasoningReplayScope{}
}
return codexReasoningReplayScope{
modelName: thinking.ParseSuffix(req.Model).ModelName,
sessionKey: codexReasoningReplaySessionKey(ctx, from, req, opts, body),
}
}
func codexReasoningReplayEnabledForSource(from sdktranslator.Format) bool {
return sourceFormatEqual(from, sdktranslator.FormatClaude)
}
func sourceFormatEqual(from, want sdktranslator.Format) bool {
return strings.EqualFold(strings.TrimSpace(from.String()), want.String())
}
func codexClaudeCodeReplaySessionKey(payload []byte) string {
sessionID := extractClaudeCodeSessionIDForCodexReplay(payload)
if sessionID == "" {
return ""
}
return "claude:" + sessionID
}
func codexClaudeCodePromptCacheStorageKey(req cliproxyexecutor.Request) string {
sessionID := extractClaudeCodeSessionIDForCodexReplay(req.Payload)
if sessionID == "" {
return ""
}
return fmt.Sprintf("%s-claude:%s", req.Model, sessionID)
}
func codexClaudeCodePromptCache(req cliproxyexecutor.Request) (helps.CodexCache, bool) {
key := codexClaudeCodePromptCacheStorageKey(req)
if key == "" {
return helps.CodexCache{}, false
}
if cache, ok := helps.GetCodexCache(key); ok {
return cache, true
}
cache := helps.CodexCache{
ID: uuid.New().String(),
Expire: time.Now().Add(1 * time.Hour),
}
helps.SetCodexCache(key, cache)
return cache, true
}
func extractClaudeCodeSessionIDForCodexReplay(payload []byte) string {
if len(payload) == 0 {
return ""
}
userID := gjson.GetBytes(payload, "metadata.user_id").String()
if userID == "" {
return ""
}
if matches := codexClaudeCodeSessionPattern.FindStringSubmatch(userID); len(matches) >= 2 {
return matches[1]
}
if len(userID) > 0 && userID[0] == '{' {
return gjson.Get(userID, "session_id").String()
}
return ""
}
func codexReasoningReplaySessionKey(ctx context.Context, from sdktranslator.Format, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, body []byte) string {
if ctx == nil {
ctx = context.Background()
}
if value := metadataString(opts.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); value != "" {
return "execution:" + value
}
if value := metadataString(req.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); value != "" {
return "execution:" + value
}
if value := codexReasoningReplaySessionKeyFromPayload(body); value != "" {
return value
}
if value := codexReasoningReplaySessionKeyFromPayload(req.Payload); value != "" {
return value
}
if value := codexReasoningReplaySessionKeyFromHeaders(opts.Headers); value != "" {
return value
}
if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil && ginCtx.Request != nil {
if value := codexReasoningReplaySessionKeyFromHeaders(ginCtx.Request.Header); value != "" {
return value
}
}
if sourceFormatEqual(from, sdktranslator.FormatClaude) {
return codexClaudeCodeReplaySessionKey(req.Payload)
}
if sourceFormatEqual(from, sdktranslator.FormatOpenAI) {
if apiKey := strings.TrimSpace(helps.APIKeyFromContext(ctx)); apiKey != "" {
return "prompt-cache:" + uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+apiKey)).String()
}
}
return ""
}
func metadataString(metadata map[string]any, key string) string {
if len(metadata) == 0 {
return ""
}
raw, ok := metadata[key]
if !ok || raw == nil {
return ""
}
switch v := raw.(type) {
case string:
return strings.TrimSpace(v)
case []byte:
return strings.TrimSpace(string(v))
default:
return ""
}
}
func codexReasoningReplaySessionKeyFromPayload(payload []byte) string {
if len(payload) == 0 {
return ""
}
if promptCacheKey := strings.TrimSpace(gjson.GetBytes(payload, "prompt_cache_key").String()); promptCacheKey != "" {
return "prompt-cache:" + promptCacheKey
}
if windowID := strings.TrimSpace(gjson.GetBytes(payload, "client_metadata.x-codex-window-id").String()); windowID != "" {
return "window:" + windowID
}
if turnMetadata := strings.TrimSpace(gjson.GetBytes(payload, "client_metadata.x-codex-turn-metadata").String()); turnMetadata != "" {
return codexReasoningReplaySessionKeyFromTurnMetadata(turnMetadata)
}
return ""
}
func codexReasoningReplaySessionKeyFromHeaders(headers http.Header) string {
if headers == nil {
return ""
}
if turnMetadata := strings.TrimSpace(headers.Get("X-Codex-Turn-Metadata")); turnMetadata != "" {
if key := codexReasoningReplaySessionKeyFromTurnMetadata(turnMetadata); key != "" {
return key
}
}
if windowID := strings.TrimSpace(headerValueCaseInsensitive(headers, "X-Codex-Window-Id")); windowID != "" {
return "window:" + windowID
}
for _, headerName := range []string{"Session_id", "session_id", "Session-Id"} {
if value := strings.TrimSpace(headerValueCaseInsensitive(headers, headerName)); value != "" {
return "session-id:" + value
}
}
if conversationID := strings.TrimSpace(headerValueCaseInsensitive(headers, "Conversation_id")); conversationID != "" {
return "conversation_id:" + conversationID
}
return ""
}
func codexReasoningReplaySessionKeyFromTurnMetadata(turnMetadata string) string {
if promptCacheKey := strings.TrimSpace(gjson.Get(turnMetadata, "prompt_cache_key").String()); promptCacheKey != "" {
return "prompt-cache:" + promptCacheKey
}
if windowID := strings.TrimSpace(gjson.Get(turnMetadata, "window_id").String()); windowID != "" {
return "window:" + windowID
}
return ""
}
func codexInputHasValidReasoningEncryptedContent(body []byte) bool {
input := gjson.GetBytes(body, "input")
if !input.IsArray() {
return false
}
for _, item := range input.Array() {
if strings.TrimSpace(item.Get("type").String()) != "reasoning" {
continue
}
encryptedContent := item.Get("encrypted_content")
if encryptedContent.Type != gjson.String {
continue
}
if _, err := signature.InspectGPTReasoningSignature(encryptedContent.String()); err == nil {
return true
}
}
return false
}
func filterCodexReasoningReplayItemsForInput(body []byte, items [][]byte) [][]byte {
input := gjson.GetBytes(body, "input")
if !input.IsArray() {
return nil
}
hasInputReasoning := codexInputHasValidReasoningEncryptedContent(body)
existingCalls := make(map[string]bool)
for _, inputItem := range input.Array() {
for _, key := range codexReplayToolCallKeys(inputItem) {
existingCalls[key] = true
}
}
filtered := make([][]byte, 0, len(items))
for _, item := range items {
itemResult := gjson.ParseBytes(item)
switch strings.TrimSpace(itemResult.Get("type").String()) {
case "reasoning":
if hasInputReasoning {
continue
}
case "function_call", "custom_tool_call":
keys := codexReplayToolCallKeys(itemResult)
if len(keys) == 0 || codexReplayAnyToolCallKeyExists(existingCalls, keys) {
continue
}
for _, key := range keys {
existingCalls[key] = true
}
default:
continue
}
filtered = append(filtered, item)
}
return filtered
}
func insertCodexReasoningReplayItems(body []byte, replayItems [][]byte) ([]byte, bool) {
input := gjson.GetBytes(body, "input")
if !input.IsArray() || len(replayItems) == 0 {
return body, false
}
inputItems := input.Array()
insertIndex := codexReasoningReplayInsertIndex(inputItems, replayItems)
replayItems = codexAlignReasoningReplayToolCallIDs(inputItems, replayItems)
items := make([]string, 0, len(inputItems)+len(replayItems))
for i, inputItem := range inputItems {
if i == insertIndex {
for _, replayItem := range replayItems {
items = append(items, string(replayItem))
}
}
items = append(items, inputItem.Raw)
}
if insertIndex == len(inputItems) {
for _, replayItem := range replayItems {
items = append(items, string(replayItem))
}
}
updated, err := sjson.SetRawBytes(body, "input", []byte("["+strings.Join(items, ",")+"]"))
if err != nil {
return body, false
}
return updated, true
}
func codexReasoningReplayInsertIndex(inputItems []gjson.Result, replayItems [][]byte) int {
replayCallIDs := make(map[string]bool)
for _, replayItem := range replayItems {
itemResult := gjson.ParseBytes(replayItem)
itemType := strings.TrimSpace(itemResult.Get("type").String())
if itemType != "function_call" && itemType != "custom_tool_call" {
continue
}
for _, callID := range codexReplayComparableCallIDs(itemResult.Get("call_id").String()) {
replayCallIDs[callID] = true
}
}
if len(replayCallIDs) > 0 {
for index, inputItem := range inputItems {
itemType := strings.TrimSpace(inputItem.Get("type").String())
if itemType != "function_call_output" && itemType != "custom_tool_call_output" {
continue
}
callID := strings.TrimSpace(inputItem.Get("call_id").String())
if callID == "" || replayCallIDs[callID] {
return index
}
}
}
for index := len(inputItems) - 1; index >= 0; index-- {
inputItem := inputItems[index]
if strings.TrimSpace(inputItem.Get("type").String()) == "message" && strings.TrimSpace(inputItem.Get("role").String()) == "assistant" {
return index
}
}
for index, inputItem := range inputItems {
if shouldInsertCodexReasoningReplayBefore(inputItem) {
return index
}
}
return len(inputItems)
}
func codexAlignReasoningReplayToolCallIDs(inputItems []gjson.Result, replayItems [][]byte) [][]byte {
outputCallIDs := codexReplayOutputCallIDs(inputItems)
if len(outputCallIDs) == 0 {
return replayItems
}
aligned := make([][]byte, 0, len(replayItems))
for _, replayItem := range replayItems {
itemResult := gjson.ParseBytes(replayItem)
itemType := strings.TrimSpace(itemResult.Get("type").String())
if itemType != "function_call" && itemType != "custom_tool_call" {
aligned = append(aligned, replayItem)
continue
}
callID := strings.TrimSpace(itemResult.Get("call_id").String())
outputCallID := ""
for _, candidate := range codexReplayComparableCallIDs(callID) {
if value := outputCallIDs[candidate]; value != "" {
outputCallID = value
break
}
}
if outputCallID == "" || outputCallID == callID {
aligned = append(aligned, replayItem)
continue
}
updated, err := sjson.SetBytes(replayItem, "call_id", outputCallID)
if err != nil {
aligned = append(aligned, replayItem)
continue
}
aligned = append(aligned, updated)
}
return aligned
}
func codexReplayOutputCallIDs(inputItems []gjson.Result) map[string]string {
outputCallIDs := make(map[string]string)
for _, inputItem := range inputItems {
itemType := strings.TrimSpace(inputItem.Get("type").String())
if itemType != "function_call_output" && itemType != "custom_tool_call_output" {
continue
}
callID := strings.TrimSpace(inputItem.Get("call_id").String())
if callID == "" {
continue
}
for _, candidate := range codexReplayComparableCallIDs(callID) {
outputCallIDs[candidate] = callID
}
}
return outputCallIDs
}
func shouldInsertCodexReasoningReplayBefore(item gjson.Result) bool {
if strings.TrimSpace(item.Get("type").String()) != "message" {
return true
}
switch strings.TrimSpace(item.Get("role").String()) {
case "developer", "system":
return false
default:
return true
}
}
func codexReplayToolCallKeys(item gjson.Result) []string {
itemType := strings.TrimSpace(item.Get("type").String())
if itemType != "function_call" && itemType != "custom_tool_call" {
return nil
}
callIDs := codexReplayComparableCallIDs(item.Get("call_id").String())
if len(callIDs) == 0 {
return nil
}
keys := make([]string, 0, len(callIDs))
for _, callID := range callIDs {
keys = append(keys, itemType+":"+callID)
}
return keys
}
func codexReplayAnyToolCallKeyExists(existing map[string]bool, keys []string) bool {
for _, key := range keys {
if existing[key] {
return true
}
}
return false
}
func codexReplayComparableCallIDs(callID string) []string {
callID = strings.TrimSpace(callID)
if callID == "" {
return nil
}
claudeVisibleCallID := shortenCodexReplayCallIDIfNeeded(util.SanitizeClaudeToolID(callID))
if claudeVisibleCallID == "" || claudeVisibleCallID == callID {
return []string{callID}
}
return []string{callID, claudeVisibleCallID}
}
func shortenCodexReplayCallIDIfNeeded(id string) string {
const limit = 64
if len(id) <= limit {
return id
}
sum := sha256.Sum256([]byte(id))
suffix := "_" + hex.EncodeToString(sum[:8])
prefixLen := limit - len(suffix)
if prefixLen <= 0 {
return suffix[len(suffix)-limit:]
}
return id[:prefixLen] + suffix
}
func cacheCodexReasoningReplayFromCompleted(scope codexReasoningReplayScope, completedData []byte) {
if !scope.valid() {
return
}
output := gjson.GetBytes(completedData, "response.output")
if !output.IsArray() {
return
}
items := make([][]byte, 0, len(output.Array()))
for _, item := range output.Array() {
switch strings.TrimSpace(item.Get("type").String()) {
case "reasoning", "function_call", "custom_tool_call":
items = append(items, []byte(item.Raw))
default:
continue
}
}
if !internalcache.CacheCodexReasoningReplayItems(scope.modelName, scope.sessionKey, items) {
internalcache.DeleteCodexReasoningReplayItem(scope.modelName, scope.sessionKey)
}
}
func clearCodexReasoningReplayOnInvalidSignature(scope codexReasoningReplayScope, statusCode int, body []byte) {
if !scope.valid() {
return
}
code, _, ok := codexStatusErrorClassification(statusCode, body)
if ok && code == "thinking_signature_invalid" {
internalcache.DeleteCodexReasoningReplayItem(scope.modelName, scope.sessionKey)
}
}
// PrepareRequest injects Codex credentials into the outgoing HTTP request.
func (e *CodexExecutor) PrepareRequest(req *http.Request, auth *cliproxyauth.Auth) error {
if req == nil {
@@ -295,6 +793,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
body = ensureImageGenerationTool(body, baseModel, auth)
}
body = sanitizeOpenAIResponsesReasoningEncryptedContent(ctx, "codex executor", body)
body, replayScope := applyCodexReasoningReplayCache(ctx, from, req, opts, body)
reporter.SetTranslatedReasoningEffort(body, to.String())
url := strings.TrimSuffix(baseURL, "/") + "/responses"
@@ -338,6 +837,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
b = applyCodexIdentityConfuseResponsePayload(b, identityState)
clearCodexReasoningReplayOnInvalidSignature(replayScope, httpResp.StatusCode, b)
helps.AppendAPIResponseChunk(ctx, e.cfg, b)
helps.LogWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
err = newCodexStatusErr(httpResp.StatusCode, b)
@@ -362,7 +862,8 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
eventData := bytes.TrimSpace(line[5:])
eventType := gjson.GetBytes(eventData, "type").String()
if streamErr, ok := codexTerminalStreamContextLengthErr(eventData); ok {
if streamErr, terminalBody, ok := codexTerminalStreamErr(eventData); ok {
clearCodexReasoningReplayOnInvalidSignature(replayScope, streamErr.StatusCode(), terminalBody)
err = streamErr
return resp, err
}
@@ -412,6 +913,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
}
completedData = completedDataPatched
}
cacheCodexReasoningReplayFromCompleted(replayScope, completedData)
var param any
clientCompletedData := applyCodexIdentityExposeResponsePayload(completedData, identityState)
@@ -566,6 +1068,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
body = ensureImageGenerationTool(body, baseModel, auth)
}
body = sanitizeOpenAIResponsesReasoningEncryptedContent(ctx, "codex executor", body)
body, replayScope := applyCodexReasoningReplayCache(ctx, from, req, opts, body)
reporter.SetTranslatedReasoningEffort(body, to.String())
url := strings.TrimSuffix(baseURL, "/") + "/responses"
@@ -612,6 +1115,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
return nil, readErr
}
data = applyCodexIdentityConfuseResponsePayload(data, identityState)
clearCodexReasoningReplayOnInvalidSignature(replayScope, httpResp.StatusCode, data)
helps.AppendAPIResponseChunk(ctx, e.cfg, data)
helps.LogWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), data))
err = newCodexStatusErr(httpResp.StatusCode, data)
@@ -637,7 +1141,8 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
if bytes.HasPrefix(line, dataTag) {
data := bytes.TrimSpace(line[5:])
if streamErr, ok := codexTerminalStreamContextLengthErr(data); ok {
if streamErr, terminalBody, ok := codexTerminalStreamErr(data); ok {
clearCodexReasoningReplayOnInvalidSignature(replayScope, streamErr.StatusCode(), terminalBody)
helps.RecordAPIResponseError(ctx, e.cfg, streamErr)
reporter.PublishFailure(ctx, streamErr)
select {
@@ -655,6 +1160,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
}
publishCodexImageToolUsage(ctx, reporter, body, data)
data = patchCodexCompletedOutput(data, outputItemsByIndex, outputItemsFallback)
cacheCodexReasoningReplayFromCompleted(replayScope, data)
translatedLine = append([]byte("data: "), data...)
}
}
@@ -895,25 +1401,16 @@ type codexIdentityReplacement struct {
func (e *CodexExecutor) cacheHelper(ctx context.Context, from sdktranslator.Format, url string, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, userPayload []byte, rawJSON []byte) (*http.Request, []byte, codexIdentityConfuseState, error) {
var cache helps.CodexCache
if from == "claude" {
userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id")
if userIDResult.Exists() {
key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String())
var ok bool
if cache, ok = helps.GetCodexCache(key); !ok {
cache = helps.CodexCache{
ID: uuid.New().String(),
Expire: time.Now().Add(1 * time.Hour),
}
helps.SetCodexCache(key, cache)
}
if sourceFormatEqual(from, sdktranslator.FormatClaude) {
if cached, ok := codexClaudeCodePromptCache(req); ok {
cache = cached
}
} else if from == "openai-response" {
} else if sourceFormatEqual(from, sdktranslator.FormatOpenAIResponse) {
promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key")
if promptCacheKey.Exists() {
cache.ID = promptCacheKey.String()
}
} else if from == "openai" {
} else if sourceFormatEqual(from, sdktranslator.FormatOpenAI) {
if apiKey := strings.TrimSpace(helps.APIKeyFromContext(ctx)); apiKey != "" {
cache.ID = uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+apiKey)).String()
}
@@ -978,10 +1475,7 @@ func applyCodexIdentityConfuseHeaders(headers http.Header, state *codexIdentityC
return
}
setHeaderCasePreserved(headers, "Session-Id", state.promptCacheKey)
if headerValueCaseInsensitive(headers, "session_id") != "" {
setHeaderCasePreserved(headers, "session_id", state.promptCacheKey)
}
setCodexSessionHeaderCasePreserved(headers, "Session_id", state.promptCacheKey)
if headerValueCaseInsensitive(headers, "Conversation_id") != "" {
setHeaderCasePreserved(headers, "Conversation_id", state.promptCacheKey)
}

View File

@@ -47,8 +47,11 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_StablePromptCacheKeyFrom
if gotConversation := httpReq.Header.Get("Conversation_id"); gotConversation != "" {
t.Fatalf("Conversation_id = %q, want empty", gotConversation)
}
if gotSession := httpReq.Header.Get("Session_id"); gotSession != expectedKey {
t.Fatalf("Session_id = %q, want %q", gotSession, expectedKey)
if gotSession := httpReq.Header["Session_id"]; len(gotSession) != 1 || gotSession[0] != expectedKey {
t.Fatalf("Session_id = %#v, want [%q]", gotSession, expectedKey)
}
if gotCanonicalSession := httpReq.Header.Get("Session-Id"); gotCanonicalSession != "" {
t.Fatalf("Session-Id = %q, want empty", gotCanonicalSession)
}
httpReq2, _, _, err := executor.cacheHelper(ctx, sdktranslator.FromString("openai"), url, nil, req, req.Payload, rawJSON)
@@ -65,6 +68,88 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_StablePromptCacheKeyFrom
}
}
func TestCodexExecutorCacheHelper_ClaudeUsesClaudeCodeSessionID(t *testing.T) {
executor := &CodexExecutor{}
ctx := context.Background()
url := "https://example.com/responses"
rawJSON := []byte(`{"model":"gpt-5.4","stream":true}`)
firstReq := cliproxyexecutor.Request{
Model: "gpt-5.4-claude-cache-session",
Payload: []byte(`{
"model":"gpt-5.4",
"metadata":{"user_id":"{\"device_id\":\"device-a\",\"account_uuid\":\"\",\"session_id\":\"cache-session-1\"}"},
"messages":[{"role":"user","content":[{"type":"text","text":"first"}]}]
}`),
}
secondReq := cliproxyexecutor.Request{
Model: "gpt-5.4-claude-cache-session",
Payload: []byte(`{
"model":"gpt-5.4",
"metadata":{"user_id":"{\"device_id\":\"device-b\",\"account_uuid\":\"\",\"session_id\":\"cache-session-1\"}"},
"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]
}`),
}
firstHTTPReq, _, _, err := executor.cacheHelper(ctx, sdktranslator.FromString("claude"), url, nil, firstReq, firstReq.Payload, rawJSON)
if err != nil {
t.Fatalf("cacheHelper first error: %v", err)
}
secondHTTPReq, _, _, err := executor.cacheHelper(ctx, sdktranslator.FromString("claude"), url, nil, secondReq, secondReq.Payload, rawJSON)
if err != nil {
t.Fatalf("cacheHelper second error: %v", err)
}
firstBody, errRead := io.ReadAll(firstHTTPReq.Body)
if errRead != nil {
t.Fatalf("read first request body: %v", errRead)
}
secondBody, errRead := io.ReadAll(secondHTTPReq.Body)
if errRead != nil {
t.Fatalf("read second request body: %v", errRead)
}
firstKey := gjson.GetBytes(firstBody, "prompt_cache_key").String()
secondKey := gjson.GetBytes(secondBody, "prompt_cache_key").String()
if firstKey == "" {
t.Fatalf("first prompt_cache_key is empty; body=%s", string(firstBody))
}
if secondKey != firstKey {
t.Fatalf("same Claude Code session_id produced different prompt_cache_key: first=%q second=%q", firstKey, secondKey)
}
if gotSession := firstHTTPReq.Header["Session_id"]; len(gotSession) != 1 || gotSession[0] != firstKey {
t.Fatalf("first Session_id = %#v, want [%q]", gotSession, firstKey)
}
if gotSession := secondHTTPReq.Header["Session_id"]; len(gotSession) != 1 || gotSession[0] != firstKey {
t.Fatalf("second Session_id = %#v, want [%q]", gotSession, firstKey)
}
}
func TestCodexExecutorCacheHelper_ClaudeRejectsBareUserID(t *testing.T) {
executor := &CodexExecutor{}
req := cliproxyexecutor.Request{
Model: "gpt-5.4-claude-cache-bare-user",
Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"same-user-across-chats"},"messages":[{"role":"user","content":[{"type":"text","text":"first"}]}]}`),
}
httpReq, _, _, err := executor.cacheHelper(context.Background(), sdktranslator.FromString("claude"), "https://example.com/responses", nil, req, req.Payload, []byte(`{"model":"gpt-5.4","stream":true}`))
if err != nil {
t.Fatalf("cacheHelper error: %v", err)
}
body, errRead := io.ReadAll(httpReq.Body)
if errRead != nil {
t.Fatalf("read request body: %v", errRead)
}
if got := gjson.GetBytes(body, "prompt_cache_key").String(); got != "" {
t.Fatalf("bare metadata.user_id must not create prompt_cache_key, got %q; body=%s", got, string(body))
}
if got := httpReq.Header["Session_id"]; len(got) != 0 {
t.Fatalf("bare metadata.user_id must not create Session_id, got %#v", got)
}
if got := httpReq.Header.Get("Session-Id"); got != "" {
t.Fatalf("bare metadata.user_id must not create Session-Id, got %q", got)
}
}
func TestCodexExecutorCacheHelper_IdentityConfuseRemapsBodyAndHeaders(t *testing.T) {
recorder := httptest.NewRecorder()
ginCtx, _ := gin.CreateTestContext(recorder)
@@ -114,13 +199,16 @@ func TestCodexExecutorCacheHelper_IdentityConfuseRemapsBodyAndHeaders(t *testing
if gotWindowID := gjson.GetBytes(body, "client_metadata.x-codex-window-id").String(); gotWindowID != expectedPromptCacheKey+":0" {
t.Fatalf("client_metadata.x-codex-window-id = %q, want %q", gotWindowID, expectedPromptCacheKey+":0")
}
for _, headerName := range []string{"Session-Id", "X-Client-Request-Id", "Thread-Id"} {
if gotHeader := httpReq.Header["Session_id"]; len(gotHeader) != 1 || gotHeader[0] != expectedPromptCacheKey {
t.Fatalf("Session_id = %#v, want [%q]", gotHeader, expectedPromptCacheKey)
}
for _, headerName := range []string{"X-Client-Request-Id", "Thread-Id"} {
if gotHeader := httpReq.Header.Get(headerName); gotHeader != expectedPromptCacheKey {
t.Fatalf("%s = %q, want %q", headerName, gotHeader, expectedPromptCacheKey)
}
}
if gotSession := httpReq.Header.Get("Session_id"); gotSession != expectedPromptCacheKey {
t.Fatalf("Session_id = %q, want %q", gotSession, expectedPromptCacheKey)
if gotCanonicalSession := httpReq.Header.Get("Session-Id"); gotCanonicalSession != "" {
t.Fatalf("Session-Id = %q, want empty", gotCanonicalSession)
}
if gotWindow := httpReq.Header.Get("X-Codex-Window-Id"); gotWindow != expectedPromptCacheKey+":0" {
t.Fatalf("X-Codex-Window-Id = %q, want %q", gotWindow, expectedPromptCacheKey+":0")

View File

@@ -0,0 +1,803 @@
package executor
import (
"context"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/gin-gonic/gin"
internalcache "github.com/router-for-me/CLIProxyAPI/v7/internal/cache"
"github.com/router-for-me/CLIProxyAPI/v7/internal/config"
_ "github.com/router-for-me/CLIProxyAPI/v7/internal/translator"
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth"
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/executor"
sdktranslator "github.com/router-for-me/CLIProxyAPI/v7/sdk/translator"
"github.com/tidwall/gjson"
)
func validCodexReasoningEncryptedContentForTestSeed(seed byte) string {
payload := make([]byte, 1+8+16+16+32)
payload[0] = 0x80
for i := 9; i < len(payload); i++ {
payload[i] = seed + byte(i)
}
return base64.RawURLEncoding.EncodeToString(payload)
}
func shortenedCodexReplayCallIDForTest(id string) string {
const limit = 64
if len(id) <= limit {
return id
}
sum := sha256.Sum256([]byte(id))
suffix := "_" + hex.EncodeToString(sum[:8])
prefixLen := limit - len(suffix)
if prefixLen <= 0 {
return suffix[len(suffix)-limit:]
}
return id[:prefixLen] + suffix
}
func TestCodexExecutorReasoningReplayCacheStoresFinalDoneAndInjectsNextClaudeRequest(t *testing.T) {
internalcache.ClearCodexReasoningReplayCache()
t.Cleanup(internalcache.ClearCodexReasoningReplayCache)
addedEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(1)
doneEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(2)
var bodies [][]byte
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, errRead := io.ReadAll(r.Body)
if errRead != nil {
t.Fatalf("read body: %v", errRead)
}
bodies = append(bodies, body)
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte(`data: {"type":"response.output_item.added","item":{"id":"rs_added","type":"reasoning","status":"in_progress","summary":[],"encrypted_content":"` + addedEncryptedContent + `"},"output_index":0}` + "\n"))
_, _ = w.Write([]byte(`data: {"type":"response.output_item.done","item":{"id":"rs_done","type":"reasoning","summary":[],"encrypted_content":"` + doneEncryptedContent + `"},"output_index":0}` + "\n"))
_, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[],"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}}` + "\n\n"))
}))
defer server.Close()
executor := NewCodexExecutor(&config.Config{})
auth := &cliproxyauth.Auth{
ID: "auth-replay-1",
Attributes: map[string]string{
"base_url": server.URL,
"api_key": "test",
},
}
opts := cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("claude"),
Stream: false,
}
_, err := executor.Execute(context.Background(), auth, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-1\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"hello"}]}]}`),
}, opts)
if err != nil {
t.Fatalf("first Execute error: %v", err)
}
_, err = executor.Execute(context.Background(), auth, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-1\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]}`),
}, opts)
if err != nil {
t.Fatalf("second Execute error: %v", err)
}
if len(bodies) != 2 {
t.Fatalf("upstream request count = %d, want 2", len(bodies))
}
secondBody := bodies[1]
if got := gjson.GetBytes(secondBody, "input.0.type").String(); got != "reasoning" {
t.Fatalf("input.0.type = %q, want reasoning; body=%s", got, string(secondBody))
}
if got := gjson.GetBytes(secondBody, "input.0.encrypted_content").String(); got != doneEncryptedContent {
t.Fatalf("injected encrypted_content = %q, want final done %q; body=%s", got, doneEncryptedContent, string(secondBody))
}
if got := gjson.GetBytes(secondBody, "input.1.role").String(); got != "user" {
t.Fatalf("input.1.role = %q, want user; body=%s", got, string(secondBody))
}
}
func TestCodexExecutorReasoningReplayCacheSharesSameSessionAcrossClientKeys(t *testing.T) {
internalcache.ClearCodexReasoningReplayCache()
t.Cleanup(internalcache.ClearCodexReasoningReplayCache)
from := sdktranslator.FromString("claude")
req := cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-only\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]}`),
}
opts := cliproxyexecutor.Options{SourceFormat: from}
body := []byte(`{"model":"gpt-5.4","input":[{"type":"message","role":"user","content":[{"type":"input_text","text":"next"}]}]}`)
encryptedContent := validCodexReasoningEncryptedContentForTestSeed(11)
firstScope := codexReasoningReplayScopeFromRequest(codexReplaySessionOnlyContext("client-key-a"), from, req, opts, body)
if !firstScope.valid() {
t.Fatalf("first replay scope is invalid: %#v", firstScope)
}
cacheCodexReasoningReplayFromCompleted(firstScope, []byte(`{"response":{"output":[{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+encryptedContent+`"}]}}`))
secondBody, secondScope := applyCodexReasoningReplayCache(codexReplaySessionOnlyContext("client-key-b"), from, req, opts, body)
if secondScope != firstScope {
t.Fatalf("replay scope should ignore client API key for the same session: first=%#v second=%#v", firstScope, secondScope)
}
if got := gjson.GetBytes(secondBody, "input.0.type").String(); got != "reasoning" {
t.Fatalf("input.0.type = %q, want same-session replay; body=%s", got, string(secondBody))
}
if got := gjson.GetBytes(secondBody, "input.0.encrypted_content").String(); got != encryptedContent {
t.Fatalf("injected encrypted_content = %q, want cached value", got)
}
}
func TestCodexExecutorReasoningReplaySessionKeyUsesClaudeCodeJSONSessionID(t *testing.T) {
from := sdktranslator.FromString("claude")
req := cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{
"model":"gpt-5.4",
"metadata":{"user_id":"{\"device_id\":\"device-a\",\"account_uuid\":\"\",\"session_id\":\"session-json-1\"}"},
"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]
}`),
}
body := []byte(`{"model":"gpt-5.4","input":[{"type":"message","role":"user","content":[{"type":"input_text","text":"next"}]}]}`)
got := codexReasoningReplaySessionKey(context.Background(), from, req, cliproxyexecutor.Options{SourceFormat: from}, body)
if got != "claude:session-json-1" {
t.Fatalf("codexReasoningReplaySessionKey() = %q, want claude:session-json-1", got)
}
}
func TestCodexExecutorReasoningReplaySessionKeyRejectsBareClaudeUserID(t *testing.T) {
from := sdktranslator.FromString("claude")
req := cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"same-user-across-chats"},"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]}`),
}
body := []byte(`{"model":"gpt-5.4","input":[{"type":"message","role":"user","content":[{"type":"input_text","text":"next"}]}]}`)
got := codexReasoningReplaySessionKey(context.Background(), from, req, cliproxyexecutor.Options{SourceFormat: from}, body)
if got != "" {
t.Fatalf("bare metadata.user_id must not become replay session key, got %q", got)
}
}
func TestCodexExecutorReasoningReplaySessionKeyCanonicalizesSessionHeaderAliases(t *testing.T) {
legacy := http.Header{"Session_id": []string{"session-alias"}}
lowercase := http.Header{"session_id": []string{"session-alias"}}
canonical := http.Header{"Session-Id": []string{"session-alias"}}
gotLegacy := codexReasoningReplaySessionKeyFromHeaders(legacy)
gotLowercase := codexReasoningReplaySessionKeyFromHeaders(lowercase)
gotCanonical := codexReasoningReplaySessionKeyFromHeaders(canonical)
if gotLegacy != gotLowercase || gotLowercase != gotCanonical {
t.Fatalf("session header aliases produced different keys: legacy=%q lowercase=%q canonical=%q", gotLegacy, gotLowercase, gotCanonical)
}
if gotCanonical != "session-id:session-alias" {
t.Fatalf("canonical session key = %q, want session-id:session-alias", gotCanonical)
}
}
func TestCodexExecutorReasoningReplaySessionKeyCanonicalizesWindowHeaderWithPayload(t *testing.T) {
payload := []byte(`{"client_metadata":{"x-codex-window-id":"window-1"}}`)
headers := http.Header{"X-Codex-Window-Id": []string{"window-1"}}
gotPayload := codexReasoningReplaySessionKeyFromPayload(payload)
gotHeader := codexReasoningReplaySessionKeyFromHeaders(headers)
if gotPayload != gotHeader {
t.Fatalf("window replay keys differ: payload=%q header=%q", gotPayload, gotHeader)
}
if gotHeader != "window:window-1" {
t.Fatalf("window replay key = %q, want window:window-1", gotHeader)
}
}
func TestCodexExecutorReasoningReplayCacheSharesSameSessionAcrossCodexAuths(t *testing.T) {
internalcache.ClearCodexReasoningReplayCache()
t.Cleanup(internalcache.ClearCodexReasoningReplayCache)
encryptedContent := validCodexReasoningEncryptedContentForTestSeed(12)
var bodies [][]byte
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, errRead := io.ReadAll(r.Body)
if errRead != nil {
t.Fatalf("read body: %v", errRead)
}
bodies = append(bodies, body)
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte(`data: {"type":"response.output_item.done","item":{"id":"rs_done","type":"reasoning","summary":[],"encrypted_content":"` + encryptedContent + `"},"output_index":0}` + "\n"))
_, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[]}}` + "\n\n"))
}))
defer server.Close()
executor := NewCodexExecutor(&config.Config{})
firstAuth := &cliproxyauth.Auth{
ID: "auth-replay-session-auth-a",
Attributes: map[string]string{
"base_url": server.URL,
"api_key": "test-a",
},
}
secondAuth := &cliproxyauth.Auth{
ID: "auth-replay-session-auth-b",
Attributes: map[string]string{
"base_url": server.URL,
"api_key": "test-b",
},
}
opts := cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("claude"),
Stream: false,
}
_, err := executor.Execute(context.Background(), firstAuth, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-auth-switch\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"hello"}]}]}`),
}, opts)
if err != nil {
t.Fatalf("first Execute error: %v", err)
}
_, err = executor.Execute(context.Background(), secondAuth, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-auth-switch\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]}`),
}, opts)
if err != nil {
t.Fatalf("second Execute error: %v", err)
}
if len(bodies) != 2 {
t.Fatalf("upstream request count = %d, want 2", len(bodies))
}
secondBody := bodies[1]
if got := gjson.GetBytes(secondBody, "input.0.type").String(); got != "reasoning" {
t.Fatalf("input.0.type = %q, want same-session replay across auths; body=%s", got, string(secondBody))
}
if got := gjson.GetBytes(secondBody, "input.0.encrypted_content").String(); got != encryptedContent {
t.Fatalf("injected encrypted_content = %q, want cached value", got)
}
}
func codexReplaySessionOnlyContext(apiKey string) context.Context {
recorder := httptest.NewRecorder()
ginCtx, _ := gin.CreateTestContext(recorder)
ginCtx.Set("userApiKey", apiKey)
ginCtx.Set("accessProvider", "config-inline")
ginCtx.Request = httptest.NewRequest("POST", "/v1/messages", nil)
return context.WithValue(context.Background(), "gin", ginCtx)
}
func TestCodexExecutorReasoningReplayCacheDoesNotInjectNativeResponsesRequest(t *testing.T) {
internalcache.ClearCodexReasoningReplayCache()
t.Cleanup(internalcache.ClearCodexReasoningReplayCache)
cachedEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(3)
internalcache.CacheCodexReasoningReplayItem("gpt-5.4", "prompt-cache:native-session", []byte(`{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+cachedEncryptedContent+`"}`))
var gotBody []byte
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, errRead := io.ReadAll(r.Body)
if errRead != nil {
t.Fatalf("read body: %v", errRead)
}
gotBody = body
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[]}}` + "\n\n"))
}))
defer server.Close()
executor := NewCodexExecutor(&config.Config{})
_, err := executor.Execute(context.Background(), &cliproxyauth.Auth{
ID: "auth-replay-native",
Attributes: map[string]string{
"base_url": server.URL,
"api_key": "test",
},
}, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{"model":"gpt-5.4","prompt_cache_key":"native-session","input":[{"role":"user","content":"native"}]}`),
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("openai-response"),
Stream: false,
})
if err != nil {
t.Fatalf("Execute error: %v", err)
}
if got := gjson.GetBytes(gotBody, "input.0.type").String(); got == "reasoning" {
t.Fatalf("native Responses request should not receive cached reasoning; body=%s", string(gotBody))
}
if got := gjson.GetBytes(gotBody, "input.0.role").String(); got != "user" {
t.Fatalf("input.0.role = %q, want user; body=%s", got, string(gotBody))
}
}
func TestCodexExecutorReasoningReplayCacheDoesNotStoreNativeResponsesRequest(t *testing.T) {
internalcache.ClearCodexReasoningReplayCache()
t.Cleanup(internalcache.ClearCodexReasoningReplayCache)
nativeEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(4)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = io.ReadAll(r.Body)
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[{"id":"rs_native","type":"reasoning","summary":[],"encrypted_content":"` + nativeEncryptedContent + `"}]}}` + "\n\n"))
}))
defer server.Close()
executor := NewCodexExecutor(&config.Config{})
_, err := executor.Execute(context.Background(), &cliproxyauth.Auth{
ID: "auth-replay-native-store",
Attributes: map[string]string{
"base_url": server.URL,
"api_key": "test",
},
}, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{"model":"gpt-5.4","prompt_cache_key":"native-store","input":[{"role":"user","content":"native"}]}`),
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("openai-response"),
Stream: false,
})
if err != nil {
t.Fatalf("Execute error: %v", err)
}
if _, ok := internalcache.GetCodexReasoningReplayItem("gpt-5.4", "prompt-cache:native-store"); ok {
t.Fatal("native Responses request should not populate Codex reasoning replay cache")
}
}
func TestCodexExecutorReasoningReplayCacheDoesNotDuplicateClaudeClientReasoning(t *testing.T) {
internalcache.ClearCodexReasoningReplayCache()
t.Cleanup(internalcache.ClearCodexReasoningReplayCache)
cachedEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(5)
clientEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(6)
internalcache.CacheCodexReasoningReplayItem("gpt-5.4", "claude:session-2", []byte(`{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+cachedEncryptedContent+`"}`))
var gotBody []byte
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, errRead := io.ReadAll(r.Body)
if errRead != nil {
t.Fatalf("read body: %v", errRead)
}
gotBody = body
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[]}}` + "\n\n"))
}))
defer server.Close()
executor := NewCodexExecutor(&config.Config{})
_, err := executor.Execute(context.Background(), &cliproxyauth.Auth{
ID: "auth-replay-2",
Attributes: map[string]string{
"base_url": server.URL,
"api_key": "test",
},
}, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-2\"}"},"messages":[{"role":"assistant","content":[{"type":"thinking","thinking":"client summary","signature":"` + clientEncryptedContent + `"},{"type":"text","text":"answer"}]},{"role":"user","content":[{"type":"text","text":"next"}]}]}`),
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("claude"),
Stream: false,
})
if err != nil {
t.Fatalf("Execute error: %v", err)
}
if got := gjson.GetBytes(gotBody, "input.0.encrypted_content").String(); got != clientEncryptedContent {
t.Fatalf("client reasoning should be preserved, got %q want %q; body=%s", got, clientEncryptedContent, string(gotBody))
}
reasoningCount := 0
for _, item := range gjson.GetBytes(gotBody, "input").Array() {
if item.Get("type").String() == "reasoning" {
reasoningCount++
}
}
if reasoningCount != 1 {
t.Fatalf("reasoning item count = %d, want 1; body=%s", reasoningCount, string(gotBody))
}
}
func TestCodexExecutorReasoningReplayCacheInsertsReasoningBeforeAssistantOutputInClaudeHistory(t *testing.T) {
internalcache.ClearCodexReasoningReplayCache()
t.Cleanup(internalcache.ClearCodexReasoningReplayCache)
cachedEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(7)
internalcache.CacheCodexReasoningReplayItem("gpt-5.4", "claude:session-history", []byte(`{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+cachedEncryptedContent+`"}`))
var gotBody []byte
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, errRead := io.ReadAll(r.Body)
if errRead != nil {
t.Fatalf("read body: %v", errRead)
}
gotBody = body
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[]}}` + "\n\n"))
}))
defer server.Close()
executor := NewCodexExecutor(&config.Config{})
_, err := executor.Execute(context.Background(), &cliproxyauth.Auth{
ID: "auth-replay-history",
Attributes: map[string]string{
"base_url": server.URL,
"api_key": "test",
},
}, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{
"model":"gpt-5.4",
"metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-history\"}"},
"messages":[
{"role":"user","content":[{"type":"text","text":"first"}]},
{"role":"assistant","content":[{"type":"text","text":"answer"}]},
{"role":"user","content":[{"type":"text","text":"next"}]}
]
}`),
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("claude"),
Stream: false,
})
if err != nil {
t.Fatalf("Execute error: %v", err)
}
if got := gjson.GetBytes(gotBody, "input.0.role").String(); got != "user" {
t.Fatalf("input.0.role = %q, want first user message; body=%s", got, string(gotBody))
}
if got := gjson.GetBytes(gotBody, "input.1.type").String(); got != "reasoning" {
t.Fatalf("input.1.type = %q, want cached reasoning before assistant output; body=%s", got, string(gotBody))
}
if got := gjson.GetBytes(gotBody, "input.1.encrypted_content").String(); got != cachedEncryptedContent {
t.Fatalf("input.1.encrypted_content = %q, want cached reasoning; body=%s", got, string(gotBody))
}
if got := gjson.GetBytes(gotBody, "input.2.role").String(); got != "assistant" {
t.Fatalf("input.2.role = %q, want assistant output after cached reasoning; body=%s", got, string(gotBody))
}
if got := gjson.GetBytes(gotBody, "input.3.role").String(); got != "user" {
t.Fatalf("input.3.role = %q, want final user message; body=%s", got, string(gotBody))
}
}
func TestCodexExecutorReasoningReplayCacheExecuteStreamStoresFinalDoneForClaude(t *testing.T) {
internalcache.ClearCodexReasoningReplayCache()
t.Cleanup(internalcache.ClearCodexReasoningReplayCache)
addedEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(7)
doneEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(8)
var bodies [][]byte
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, errRead := io.ReadAll(r.Body)
if errRead != nil {
t.Fatalf("read body: %v", errRead)
}
bodies = append(bodies, body)
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte(`data: {"type":"response.output_item.added","item":{"id":"rs_added","type":"reasoning","status":"in_progress","summary":[],"encrypted_content":"` + addedEncryptedContent + `"},"output_index":0}` + "\n"))
_, _ = w.Write([]byte(`data: {"type":"response.output_item.done","item":{"id":"rs_done","type":"reasoning","summary":[],"encrypted_content":"` + doneEncryptedContent + `"},"output_index":0}` + "\n"))
_, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[]}}` + "\n\n"))
}))
defer server.Close()
executor := NewCodexExecutor(&config.Config{})
auth := &cliproxyauth.Auth{
ID: "auth-replay-stream",
Attributes: map[string]string{
"base_url": server.URL,
"api_key": "test",
},
}
streamResult, err := executor.ExecuteStream(context.Background(), auth, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"stream-session-1\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"hello"}]}]}`),
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("claude"),
Stream: true,
})
if err != nil {
t.Fatalf("ExecuteStream error: %v", err)
}
for chunk := range streamResult.Chunks {
if chunk.Err != nil {
t.Fatalf("stream chunk error: %v", chunk.Err)
}
}
_, err = executor.Execute(context.Background(), auth, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"stream-session-1\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]}`),
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("claude"),
Stream: false,
})
if err != nil {
t.Fatalf("Execute error: %v", err)
}
if len(bodies) != 2 {
t.Fatalf("upstream request count = %d, want 2", len(bodies))
}
secondBody := bodies[1]
if got := gjson.GetBytes(secondBody, "input.0.encrypted_content").String(); got != doneEncryptedContent {
t.Fatalf("stream cached encrypted_content = %q, want final done %q; body=%s", got, doneEncryptedContent, string(secondBody))
}
}
func TestCodexExecutorReasoningReplayCacheClearsOnNonStreamResponseFailedInvalidSignature(t *testing.T) {
internalcache.ClearCodexReasoningReplayCache()
t.Cleanup(internalcache.ClearCodexReasoningReplayCache)
cachedEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(9)
internalcache.CacheCodexReasoningReplayItem("gpt-5.4", "claude:session-invalid-nonstream", []byte(`{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+cachedEncryptedContent+`"}`))
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = io.ReadAll(r.Body)
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte(`data: {"type":"response.failed","response":{"id":"resp_1","status":"failed","error":{"message":"Invalid signature in thinking block","type":"invalid_request_error","code":"invalid_request_error"}}}` + "\n\n"))
}))
defer server.Close()
executor := NewCodexExecutor(&config.Config{})
_, err := executor.Execute(context.Background(), &cliproxyauth.Auth{
ID: "auth-replay-invalid-nonstream",
Attributes: map[string]string{
"base_url": server.URL,
"api_key": "test",
},
}, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-invalid-nonstream\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]}`),
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("claude"),
Stream: false,
})
if err == nil {
t.Fatal("expected invalid signature error")
}
if _, ok := internalcache.GetCodexReasoningReplayItem("gpt-5.4", "claude:session-invalid-nonstream"); ok {
t.Fatal("invalid signature response.failed should clear cached replay item")
}
}
func TestCodexExecutorReasoningReplayCacheClearsOnStreamResponseFailedInvalidSignature(t *testing.T) {
internalcache.ClearCodexReasoningReplayCache()
t.Cleanup(internalcache.ClearCodexReasoningReplayCache)
cachedEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(10)
internalcache.CacheCodexReasoningReplayItem("gpt-5.4", "claude:session-invalid-stream", []byte(`{"type":"reasoning","summary":[],"content":null,"encrypted_content":"`+cachedEncryptedContent+`"}`))
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = io.ReadAll(r.Body)
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte(`data: {"type":"response.failed","response":{"id":"resp_1","status":"failed","error":{"message":"Invalid signature in thinking block","type":"invalid_request_error","code":"invalid_request_error"}}}` + "\n\n"))
}))
defer server.Close()
executor := NewCodexExecutor(&config.Config{})
streamResult, err := executor.ExecuteStream(context.Background(), &cliproxyauth.Auth{
ID: "auth-replay-invalid-stream",
Attributes: map[string]string{
"base_url": server.URL,
"api_key": "test",
},
}, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{"model":"gpt-5.4","metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"session-invalid-stream\"}"},"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]}`),
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("claude"),
Stream: true,
})
if err != nil {
t.Fatalf("ExecuteStream setup error: %v", err)
}
gotChunkErr := false
for chunk := range streamResult.Chunks {
if chunk.Err != nil {
gotChunkErr = true
}
}
if !gotChunkErr {
t.Fatal("expected stream chunk error for invalid signature response.failed")
}
if _, ok := internalcache.GetCodexReasoningReplayItem("gpt-5.4", "claude:session-invalid-stream"); ok {
t.Fatal("invalid signature response.failed should clear cached replay item")
}
}
func TestCodexExecutorReasoningReplayCacheReplaysFunctionCallForClaudeToolResult(t *testing.T) {
internalcache.ClearCodexReasoningReplayCache()
t.Cleanup(internalcache.ClearCodexReasoningReplayCache)
reasoningEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(8)
var bodies [][]byte
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, errRead := io.ReadAll(r.Body)
if errRead != nil {
t.Fatalf("read body: %v", errRead)
}
bodies = append(bodies, body)
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte(`data: {"type":"response.output_item.done","item":{"id":"rs_1","type":"reasoning","summary":[],"encrypted_content":"` + reasoningEncryptedContent + `"},"output_index":0}` + "\n"))
_, _ = w.Write([]byte(`data: {"type":"response.output_item.added","item":{"id":"fc_1","type":"function_call","call_id":"call_1","name":"lookup","arguments":"{\"q\":\"weather\"}","status":"in_progress"},"output_index":1}` + "\n"))
_, _ = w.Write([]byte(`data: {"type":"response.output_item.done","item":{"id":"fc_1","type":"function_call","call_id":"call_1","name":"lookup","arguments":"{\"q\":\"weather\"}","status":"completed"},"output_index":1}` + "\n"))
_, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[]}}` + "\n\n"))
}))
defer server.Close()
executor := NewCodexExecutor(&config.Config{})
auth := &cliproxyauth.Auth{
ID: "auth-replay-claude-tool",
Attributes: map[string]string{
"base_url": server.URL,
"api_key": "test",
},
}
opts := cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("claude"),
Stream: false,
}
_, err := executor.Execute(context.Background(), auth, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{
"model":"gpt-5.4",
"metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"claude-session-tool\"}"},
"messages":[{"role":"user","content":[{"type":"text","text":"call lookup"}]}],
"tools":[{"name":"lookup","input_schema":{"type":"object","properties":{"q":{"type":"string"}}}}]
}`),
}, opts)
if err != nil {
t.Fatalf("first Execute error: %v", err)
}
_, err = executor.Execute(context.Background(), auth, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{
"model":"gpt-5.4",
"metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"claude-session-tool\"}"},
"messages":[
{"role":"user","content":[{"type":"text","text":"call lookup"}]},
{"role":"user","content":[{"type":"tool_result","tool_use_id":"call_1","content":"sunny"}]}
],
"tools":[{"name":"lookup","input_schema":{"type":"object","properties":{"q":{"type":"string"}}}}]
}`),
}, opts)
if err != nil {
t.Fatalf("second Execute error: %v", err)
}
if len(bodies) != 2 {
t.Fatalf("upstream request count = %d, want 2", len(bodies))
}
secondBody := bodies[1]
if got := gjson.GetBytes(secondBody, "input.0.type").String(); got != "message" {
t.Fatalf("input.0.type = %q, want initial user message; body=%s", got, string(secondBody))
}
if got := gjson.GetBytes(secondBody, "input.1.type").String(); got != "reasoning" {
t.Fatalf("input.1.type = %q, want cached reasoning; body=%s", got, string(secondBody))
}
if got := gjson.GetBytes(secondBody, "input.2.type").String(); got != "function_call" {
t.Fatalf("input.2.type = %q, want cached function_call; body=%s", got, string(secondBody))
}
if got := gjson.GetBytes(secondBody, "input.2.call_id").String(); got != "call_1" {
t.Fatalf("input.2.call_id = %q, want call_1; body=%s", got, string(secondBody))
}
if got := gjson.GetBytes(secondBody, "input.3.type").String(); got != "function_call_output" {
t.Fatalf("input.3.type = %q, want function_call_output after cached call; body=%s", got, string(secondBody))
}
if got := gjson.GetBytes(secondBody, "input.3.call_id").String(); got != "call_1" {
t.Fatalf("input.3.call_id = %q, want call_1; body=%s", got, string(secondBody))
}
}
func TestCodexExecutorReasoningReplayCacheMatchesShortenedClaudeToolResultCallID(t *testing.T) {
internalcache.ClearCodexReasoningReplayCache()
t.Cleanup(internalcache.ClearCodexReasoningReplayCache)
longCallID := "call_" + strings.Repeat("a", 62)
shortCallID := shortenedCodexReplayCallIDForTest(longCallID)
if len(longCallID) <= 64 || len(shortCallID) > 64 || shortCallID == longCallID {
t.Fatalf("invalid test setup: long=%q short=%q", longCallID, shortCallID)
}
reasoningEncryptedContent := validCodexReasoningEncryptedContentForTestSeed(13)
var bodies [][]byte
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, errRead := io.ReadAll(r.Body)
if errRead != nil {
t.Fatalf("read body: %v", errRead)
}
bodies = append(bodies, body)
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte(`data: {"type":"response.output_item.done","item":{"id":"rs_long","type":"reasoning","summary":[],"encrypted_content":"` + reasoningEncryptedContent + `"},"output_index":0}` + "\n"))
_, _ = w.Write([]byte(`data: {"type":"response.output_item.done","item":{"id":"fc_long","type":"function_call","call_id":"` + longCallID + `","name":"lookup","arguments":"{\"q\":\"weather\"}","status":"completed"},"output_index":1}` + "\n"))
_, _ = w.Write([]byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","created_at":0,"status":"completed","model":"gpt-5.4","output":[]}}` + "\n\n"))
}))
defer server.Close()
executor := NewCodexExecutor(&config.Config{})
auth := &cliproxyauth.Auth{
ID: "auth-replay-claude-short-tool",
Attributes: map[string]string{
"base_url": server.URL,
"api_key": "test",
},
}
opts := cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("claude"),
Stream: false,
}
_, err := executor.Execute(context.Background(), auth, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{
"model":"gpt-5.4",
"metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"claude-session-short-tool\"}"},
"messages":[{"role":"user","content":[{"type":"text","text":"call lookup"}]}],
"tools":[{"name":"lookup","input_schema":{"type":"object","properties":{"q":{"type":"string"}}}}]
}`),
}, opts)
if err != nil {
t.Fatalf("first Execute error: %v", err)
}
_, err = executor.Execute(context.Background(), auth, cliproxyexecutor.Request{
Model: "gpt-5.4",
Payload: []byte(`{
"model":"gpt-5.4",
"metadata":{"user_id":"{\"device_id\":\"device-test\",\"account_uuid\":\"\",\"session_id\":\"claude-session-short-tool\"}"},
"messages":[
{"role":"user","content":[{"type":"text","text":"call lookup"}]},
{"role":"user","content":[{"type":"tool_result","tool_use_id":"` + shortCallID + `","content":"sunny"}]}
],
"tools":[{"name":"lookup","input_schema":{"type":"object","properties":{"q":{"type":"string"}}}}]
}`),
}, opts)
if err != nil {
t.Fatalf("second Execute error: %v", err)
}
if len(bodies) != 2 {
t.Fatalf("upstream request count = %d, want 2", len(bodies))
}
secondBody := bodies[1]
if got := gjson.GetBytes(secondBody, "input.0.type").String(); got != "message" {
t.Fatalf("input.0.type = %q, want initial user message; body=%s", got, string(secondBody))
}
if got := gjson.GetBytes(secondBody, "input.1.type").String(); got != "reasoning" {
t.Fatalf("input.1.type = %q, want cached reasoning; body=%s", got, string(secondBody))
}
if got := gjson.GetBytes(secondBody, "input.2.type").String(); got != "function_call" {
t.Fatalf("input.2.type = %q, want cached function_call; body=%s", got, string(secondBody))
}
if got := gjson.GetBytes(secondBody, "input.2.call_id").String(); got != shortCallID {
t.Fatalf("input.2.call_id = %q, want shortened call_id %q; body=%s", got, shortCallID, string(secondBody))
}
if got := gjson.GetBytes(secondBody, "input.3.type").String(); got != "function_call_output" {
t.Fatalf("input.3.type = %q, want function_call_output after cached call; body=%s", got, string(secondBody))
}
if got := gjson.GetBytes(secondBody, "input.3.call_id").String(); got != shortCallID {
t.Fatalf("input.3.call_id = %q, want shortened call_id %q; body=%s", got, shortCallID, string(secondBody))
}
}

View File

@@ -159,6 +159,13 @@ func TestCodexTerminalStreamContextLengthErrIgnoresOtherTerminalErrors(t *testin
}
}
func TestCodexTerminalStreamErrIgnoresRateLimitTerminalErrors(t *testing.T) {
_, _, ok := codexTerminalStreamErr([]byte(`{"type":"error","error":{"type":"rate_limit_error","code":"rate_limit_exceeded","message":"Rate limit reached."}}`))
if ok {
t.Fatal("rate limit terminal error should not be handled by replay terminal error path")
}
}
func statusCodeFromTestError(t *testing.T, err error) int {
t.Helper()

View File

@@ -835,21 +835,11 @@ func applyCodexPromptCacheHeaders(from sdktranslator.Format, req cliproxyexecuto
}
var cache helps.CodexCache
if from == "claude" {
userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id")
if userIDResult.Exists() {
key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String())
if cached, ok := helps.GetCodexCache(key); ok {
cache = cached
} else {
cache = helps.CodexCache{
ID: uuid.New().String(),
Expire: time.Now().Add(1 * time.Hour),
}
helps.SetCodexCache(key, cache)
}
if sourceFormatEqual(from, sdktranslator.FormatClaude) {
if cached, ok := codexClaudeCodePromptCache(req); ok {
cache = cached
}
} else if from == "openai-response" {
} else if sourceFormatEqual(from, sdktranslator.FormatOpenAIResponse) {
if promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key"); promptCacheKey.Exists() {
cache.ID = promptCacheKey.String()
}
@@ -899,10 +889,11 @@ func applyCodexWebsocketHeaders(ctx context.Context, headers http.Header, auth *
betaHeader = codexResponsesWebsocketBetaHeaderValue
}
headers.Set("OpenAI-Beta", betaHeader)
sessionFallback := ""
if strings.Contains(headers.Get("User-Agent"), "Mac OS") {
ensureHeaderCasePreserved(headers, ginHeaders, "session_id", "", uuid.NewString())
sessionFallback = uuid.NewString()
}
ensureHeaderCasePreserved(headers, ginHeaders, "session_id", "", "")
ensureCodexWebsocketSessionHeader(headers, ginHeaders, sessionFallback)
if originator := strings.TrimSpace(ginHeaders.Get("Originator")); originator != "" {
headers.Set("Originator", originator)
} else if !isAPIKey {
@@ -927,6 +918,32 @@ func applyCodexWebsocketHeaders(ctx context.Context, headers http.Header, auth *
return headers
}
func ensureCodexWebsocketSessionHeader(target http.Header, source http.Header, fallbackValue string) {
if target == nil {
return
}
sessionID := codexSessionHeaderValue(target)
if sessionID == "" {
sessionID = codexSessionHeaderValue(source)
}
if sessionID == "" {
sessionID = strings.TrimSpace(fallbackValue)
}
if sessionID != "" {
setHeaderCasePreserved(target, "session_id", sessionID)
}
deleteHeaderCaseInsensitive(target, "Session-Id")
}
func codexSessionHeaderValue(headers http.Header) string {
for _, key := range []string{"Session-Id", "Session_id", "session_id"} {
if value := strings.TrimSpace(headerValueCaseInsensitive(headers, key)); value != "" {
return value
}
}
return ""
}
func codexAuthUsesAPIKey(auth *cliproxyauth.Auth) bool {
if auth == nil || auth.Attributes == nil {
return false
@@ -969,6 +986,47 @@ func setHeaderCasePreserved(headers http.Header, key string, value string) {
headers[key] = []string{value}
}
func setCodexSessionHeaderCasePreserved(headers http.Header, fallbackKey string, value string) {
if headers == nil {
return
}
fallbackKey = strings.TrimSpace(fallbackKey)
value = strings.TrimSpace(value)
if fallbackKey == "" || value == "" {
return
}
selectedKey := ""
if _, ok := headers[fallbackKey]; ok && codexSessionHeaderKeyUsesUnderscore(fallbackKey) {
selectedKey = fallbackKey
} else {
for existingKey := range headers {
if codexSessionHeaderKeyUsesUnderscore(existingKey) {
selectedKey = existingKey
break
}
}
}
if selectedKey == "" {
selectedKey = fallbackKey
}
for existingKey := range headers {
if codexSessionHeaderKey(existingKey) && existingKey != selectedKey {
delete(headers, existingKey)
}
}
headers[selectedKey] = []string{value}
}
func codexSessionHeaderKey(key string) bool {
normalized := strings.ToLower(strings.TrimSpace(key))
return normalized == "session_id" || normalized == "session-id"
}
func codexSessionHeaderKeyUsesUnderscore(key string) bool {
return strings.ToLower(strings.TrimSpace(key)) == "session_id"
}
func headerValueCaseInsensitive(headers http.Header, key string) string {
key = strings.TrimSpace(key)
if headers == nil || key == "" {

View File

@@ -197,7 +197,7 @@ func TestApplyCodexWebsocketHeadersPassesThroughClientIdentityHeaders(t *testing
"Version": "0.115.0-alpha.27",
"X-Codex-Turn-Metadata": `{"turn_id":"turn-1"}`,
"X-Client-Request-Id": "019d2233-e240-7162-992d-38df0a2a0e0d",
"session_id": "legacy-session",
"session-id": "legacy-session",
})
headers := applyCodexWebsocketHeaders(ctx, http.Header{}, auth, "", nil)
@@ -217,11 +217,32 @@ func TestApplyCodexWebsocketHeadersPassesThroughClientIdentityHeaders(t *testing
if got := headers.Get("X-Client-Request-Id"); got != "019d2233-e240-7162-992d-38df0a2a0e0d" {
t.Fatalf("X-Client-Request-Id = %s, want %s", got, "019d2233-e240-7162-992d-38df0a2a0e0d")
}
if got := headerValueCaseInsensitive(headers, "session_id"); got != "legacy-session" {
t.Fatalf("session_id = %s, want legacy-session", got)
if got := headers["session_id"]; len(got) != 1 || got[0] != "legacy-session" {
t.Fatalf("session_id = %#v, want [legacy-session]", got)
}
if _, ok := headers["session_id"]; !ok {
t.Fatalf("expected lowercase session_id header key, got %#v", headers)
if got := headers.Get("Session-Id"); got != "" {
t.Fatalf("Session-Id = %s, want empty", got)
}
}
func TestApplyCodexWebsocketHeadersCanonicalizesLegacyUnderscoreSessionHeader(t *testing.T) {
auth := &cliproxyauth.Auth{
Provider: "codex",
Metadata: map[string]any{"email": "user@example.com"},
}
ctx := contextWithGinHeaders(map[string]string{
"Originator": "Codex Desktop",
"User-Agent": "codex_cli_rs/0.1.0",
"Session_id": "legacy-underscore-session",
})
headers := applyCodexWebsocketHeaders(ctx, http.Header{}, auth, "", nil)
if got := headers["session_id"]; len(got) != 1 || got[0] != "legacy-underscore-session" {
t.Fatalf("session_id = %#v, want [legacy-underscore-session]", got)
}
if got := headers.Get("Session-Id"); got != "" {
t.Fatalf("Session-Id = %s, want empty", got)
}
}
@@ -361,22 +382,79 @@ func TestApplyCodexWebsocketHeadersUsesCanonicalAccountHeader(t *testing.T) {
}
}
func TestApplyCodexPromptCacheHeadersSetsLowercaseSessionAndLegacyConversation(t *testing.T) {
func TestApplyCodexPromptCacheHeadersSetsSessionIDAndLegacyConversation(t *testing.T) {
req := cliproxyexecutor.Request{Model: "gpt-5-codex", Payload: []byte(`{"prompt_cache_key":"cache-1"}`)}
_, headers := applyCodexPromptCacheHeaders("openai-response", req, []byte(`{"model":"gpt-5-codex"}`))
if got := headerValueCaseInsensitive(headers, "session_id"); got != "cache-1" {
t.Fatalf("session_id = %s, want cache-1", got)
if got := headers["session_id"]; len(got) != 1 || got[0] != "cache-1" {
t.Fatalf("session_id = %#v, want [cache-1]", got)
}
if _, ok := headers["session_id"]; !ok {
t.Fatalf("expected lowercase session_id key, got %#v", headers)
if got := headers.Get("Session-Id"); got != "" {
t.Fatalf("Session-Id = %s, want empty", got)
}
if got := headers.Get("Conversation_id"); got != "cache-1" {
t.Fatalf("Conversation_id = %s, want cache-1", got)
}
}
func TestApplyCodexPromptCacheHeadersClaudeUsesClaudeCodeSessionID(t *testing.T) {
firstReq := cliproxyexecutor.Request{
Model: "gpt-5-codex-claude-ws-cache-session",
Payload: []byte(`{
"metadata":{"user_id":"{\"device_id\":\"device-a\",\"account_uuid\":\"\",\"session_id\":\"ws-cache-session-1\"}"},
"messages":[{"role":"user","content":[{"type":"text","text":"first"}]}]
}`),
}
secondReq := cliproxyexecutor.Request{
Model: "gpt-5-codex-claude-ws-cache-session",
Payload: []byte(`{
"metadata":{"user_id":"{\"device_id\":\"device-b\",\"account_uuid\":\"\",\"session_id\":\"ws-cache-session-1\"}"},
"messages":[{"role":"user","content":[{"type":"text","text":"next"}]}]
}`),
}
firstBody, firstHeaders := applyCodexPromptCacheHeaders("claude", firstReq, []byte(`{"model":"gpt-5-codex"}`))
secondBody, secondHeaders := applyCodexPromptCacheHeaders("claude", secondReq, []byte(`{"model":"gpt-5-codex"}`))
firstKey := gjson.GetBytes(firstBody, "prompt_cache_key").String()
secondKey := gjson.GetBytes(secondBody, "prompt_cache_key").String()
if firstKey == "" {
t.Fatalf("first prompt_cache_key is empty; body=%s", string(firstBody))
}
if secondKey != firstKey {
t.Fatalf("same Claude Code session_id produced different websocket prompt_cache_key: first=%q second=%q", firstKey, secondKey)
}
if got := firstHeaders["session_id"]; len(got) != 1 || got[0] != firstKey {
t.Fatalf("first session_id = %#v, want [%q]", got, firstKey)
}
if got := secondHeaders["session_id"]; len(got) != 1 || got[0] != firstKey {
t.Fatalf("second session_id = %#v, want [%q]", got, firstKey)
}
}
func TestApplyCodexPromptCacheHeadersClaudeRejectsBareUserID(t *testing.T) {
req := cliproxyexecutor.Request{
Model: "gpt-5-codex-claude-ws-cache-bare-user",
Payload: []byte(`{"metadata":{"user_id":"same-user-across-chats"},"messages":[{"role":"user","content":[{"type":"text","text":"first"}]}]}`),
}
body, headers := applyCodexPromptCacheHeaders("claude", req, []byte(`{"model":"gpt-5-codex"}`))
if got := gjson.GetBytes(body, "prompt_cache_key").String(); got != "" {
t.Fatalf("bare metadata.user_id must not create websocket prompt_cache_key, got %q; body=%s", got, string(body))
}
if got := headers["session_id"]; len(got) != 0 {
t.Fatalf("bare metadata.user_id must not create websocket session_id, got %#v", got)
}
if got := headers.Get("Session-Id"); got != "" {
t.Fatalf("bare metadata.user_id must not create websocket Session-Id, got %q", got)
}
if got := headers.Get("Conversation_id"); got != "" {
t.Fatalf("bare metadata.user_id must not create websocket Conversation_id, got %q", got)
}
}
func TestApplyCodexWebsocketHeadersIdentityConfuseRemapsPromptCacheKey(t *testing.T) {
cfg := &config.Config{
Routing: config.RoutingConfig{SessionAffinity: true},
@@ -402,8 +480,11 @@ func TestApplyCodexWebsocketHeadersIdentityConfuseRemapsPromptCacheKey(t *testin
if gotKey := gjson.GetBytes(body, "prompt_cache_key").String(); gotKey != expectedPromptCacheKey {
t.Fatalf("prompt_cache_key = %q, want %q", gotKey, expectedPromptCacheKey)
}
if gotSession := headerValueCaseInsensitive(headers, "session_id"); gotSession != expectedPromptCacheKey {
t.Fatalf("session_id = %q, want %q", gotSession, expectedPromptCacheKey)
if gotSession := headers["session_id"]; len(gotSession) != 1 || gotSession[0] != expectedPromptCacheKey {
t.Fatalf("session_id = %#v, want [%q]", gotSession, expectedPromptCacheKey)
}
if gotCanonicalSession := headers.Get("Session-Id"); gotCanonicalSession != "" {
t.Fatalf("Session-Id = %q, want empty", gotCanonicalSession)
}
if gotRequestID := headers.Get("X-Client-Request-Id"); gotRequestID != expectedPromptCacheKey {
t.Fatalf("X-Client-Request-Id = %q, want %q", gotRequestID, expectedPromptCacheKey)