feat(executor): refine session and conversation header handling for Codex

- Updated session handling to replace `Session_id` and `Conversation_id` headers with new logic ensuring consistent use of `Cache.ID` and prompt keys.
- Restored `Session_id` as a priority extraction source for `ExtractSessionID`.
- Added tests to validate case-sensitive and case-insensitive headers, canonical account header usage, and session key preservation.
- Removed legacy support for deprecated `Conversation_id` header to clean up API.
This commit is contained in:
Luis Pater
2026-06-01 11:27:10 +08:00
parent fb4f39d300
commit 05b972479a
9 changed files with 129 additions and 61 deletions

View File

@@ -119,7 +119,7 @@ routing:
strategy: "round-robin" # round-robin (default), fill-first
# Enable universal session-sticky routing for all clients.
# Session IDs are extracted from: metadata.user_id (Claude Code session format),
# X-Session-ID, X-Amp-Thread-Id (Amp CLI),
# X-Session-ID, Session_id (Codex), X-Amp-Thread-Id (Amp CLI),
# X-Client-Request-Id (PI), conversation_id, or first few messages hash.
# Automatic failover is always enabled when bound auth becomes unavailable.
session-affinity: false # default: false

View File

@@ -237,7 +237,7 @@ type RoutingConfig struct {
// SessionAffinity enables universal session-sticky routing for all clients.
// Session IDs are extracted from multiple sources:
// metadata.user_id (Claude Code session format), X-Session-ID,
// metadata.user_id (Claude Code session format), X-Session-ID, Session_id (Codex),
// X-Amp-Thread-Id (Amp CLI thread), X-Client-Request-Id (PI), metadata.user_id,
// conversation_id, or message hash.
// Automatic failover is always enabled when bound auth becomes unavailable.

View File

@@ -931,6 +931,9 @@ func (e *CodexExecutor) cacheHelper(ctx context.Context, from sdktranslator.Form
if err != nil {
return nil, nil, codexIdentityConfuseState{}, err
}
if cache.ID != "" {
httpReq.Header.Set("Session_id", cache.ID)
}
return httpReq, rawJSON, identityState, nil
}
@@ -964,7 +967,6 @@ func applyCodexIdentityConfuseHeaders(headers http.Header, state *codexIdentityC
if headers == nil {
return
}
defer deleteDeprecatedCodexConversationHeader(headers)
if state == nil || !state.enabled {
return
}
@@ -977,6 +979,12 @@ func applyCodexIdentityConfuseHeaders(headers http.Header, state *codexIdentityC
}
setHeaderCasePreserved(headers, "Session-Id", state.promptCacheKey)
if headerValueCaseInsensitive(headers, "session_id") != "" {
setHeaderCasePreserved(headers, "session_id", state.promptCacheKey)
}
if headerValueCaseInsensitive(headers, "Conversation_id") != "" {
setHeaderCasePreserved(headers, "Conversation_id", state.promptCacheKey)
}
headers.Set("X-Client-Request-Id", state.promptCacheKey)
headers.Set("Thread-Id", state.promptCacheKey)
headers.Set("X-Codex-Window-Id", state.promptCacheKey+":0")
@@ -1072,6 +1080,10 @@ func applyCodexHeaders(r *http.Request, auth *cliproxyauth.Auth, token string, s
cfgUserAgent, _ := codexHeaderDefaults(cfg, auth)
ensureHeaderWithConfigPrecedence(r.Header, ginHeaders, "User-Agent", cfgUserAgent, codexUserAgent)
if strings.Contains(r.Header.Get("User-Agent"), "Mac OS") {
misc.EnsureHeader(r.Header, ginHeaders, "Session_id", uuid.NewString())
}
if stream {
r.Header.Set("Accept", "text/event-stream")
} else {
@@ -1090,19 +1102,18 @@ func applyCodexHeaders(r *http.Request, auth *cliproxyauth.Auth, token string, s
} else if !isAPIKey {
r.Header.Set("Originator", codexOriginator)
}
// if !isAPIKey {
// if auth != nil && auth.Metadata != nil {
// if accountID, ok := auth.Metadata["account_id"].(string); ok {
// r.Header.Set("Chatgpt-Account-Id", accountID)
// }
// }
// }
if !isAPIKey {
if auth != nil && auth.Metadata != nil {
if accountID, ok := auth.Metadata["account_id"].(string); ok {
r.Header.Set("Chatgpt-Account-Id", accountID)
}
}
}
var attrs map[string]string
if auth != nil {
attrs = auth.Attributes
}
util.ApplyCustomHeadersFromAttrs(r, attrs)
deleteDeprecatedCodexConversationHeader(r.Header)
}
func newCodexStatusErr(statusCode int, body []byte) statusErr {

View File

@@ -47,8 +47,8 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_StablePromptCacheKeyFrom
if gotConversation := httpReq.Header.Get("Conversation_id"); gotConversation != "" {
t.Fatalf("Conversation_id = %q, want empty", gotConversation)
}
if gotSession := httpReq.Header.Get("Session_id"); gotSession != "" {
t.Fatalf("Session_id = %q, want empty", gotSession)
if gotSession := httpReq.Header.Get("Session_id"); gotSession != expectedKey {
t.Fatalf("Session_id = %q, want %q", gotSession, expectedKey)
}
httpReq2, _, _, err := executor.cacheHelper(ctx, sdktranslator.FromString("openai"), url, nil, req, req.Payload, rawJSON)
@@ -119,8 +119,8 @@ func TestCodexExecutorCacheHelper_IdentityConfuseRemapsBodyAndHeaders(t *testing
t.Fatalf("%s = %q, want %q", headerName, gotHeader, expectedPromptCacheKey)
}
}
if gotSession := httpReq.Header.Get("Session_id"); gotSession != "" {
t.Fatalf("Session_id = %q, want empty", gotSession)
if gotSession := httpReq.Header.Get("Session_id"); gotSession != expectedPromptCacheKey {
t.Fatalf("Session_id = %q, want %q", gotSession, expectedPromptCacheKey)
}
if gotWindow := httpReq.Header.Get("X-Codex-Window-Id"); gotWindow != expectedPromptCacheKey+":0" {
t.Fatalf("X-Codex-Window-Id = %q, want %q", gotWindow, expectedPromptCacheKey+":0")
@@ -137,6 +137,20 @@ func TestCodexExecutorCacheHelper_IdentityConfuseRemapsBodyAndHeaders(t *testing
}
}
func TestApplyCodexHeadersUsesAccountHeaderForOAuth(t *testing.T) {
httpReq := httptest.NewRequest("POST", "https://example.com/responses", nil)
auth := &cliproxyauth.Auth{
Provider: "codex",
Metadata: map[string]any{"account_id": "acct-1"},
}
applyCodexHeaders(httpReq, auth, "oauth-token", true, nil)
if got := httpReq.Header.Get("Chatgpt-Account-Id"); got != "acct-1" {
t.Fatalf("Chatgpt-Account-Id = %q, want acct-1", got)
}
}
func TestCodexIdentityConfuseKeepsClientBodySeparateFromUpstreamBody(t *testing.T) {
cfg := &config.Config{
Routing: config.RoutingConfig{Strategy: "fill-first"},

View File

@@ -857,6 +857,8 @@ func applyCodexPromptCacheHeaders(from sdktranslator.Format, req cliproxyexecuto
if cache.ID != "" {
rawJSON, _ = sjson.SetBytes(rawJSON, "prompt_cache_key", cache.ID)
setHeaderCasePreserved(headers, "session_id", cache.ID)
headers.Set("Conversation_id", cache.ID)
}
return rawJSON, headers
@@ -897,27 +899,30 @@ func applyCodexWebsocketHeaders(ctx context.Context, headers http.Header, auth *
betaHeader = codexResponsesWebsocketBetaHeaderValue
}
headers.Set("OpenAI-Beta", betaHeader)
if strings.Contains(headers.Get("User-Agent"), "Mac OS") {
ensureHeaderCasePreserved(headers, ginHeaders, "session_id", "", uuid.NewString())
}
ensureHeaderCasePreserved(headers, ginHeaders, "session_id", "", "")
if originator := strings.TrimSpace(ginHeaders.Get("Originator")); originator != "" {
headers.Set("Originator", originator)
} else if !isAPIKey {
headers.Set("Originator", codexOriginator)
}
// if !isAPIKey {
// if auth != nil && auth.Metadata != nil {
// if accountID, ok := auth.Metadata["account_id"].(string); ok {
// if trimmed := strings.TrimSpace(accountID); trimmed != "" {
// setHeaderCasePreserved(headers, "ChatGPT-Account-ID", trimmed)
// }
// }
// }
// }
if !isAPIKey {
if auth != nil && auth.Metadata != nil {
if accountID, ok := auth.Metadata["account_id"].(string); ok {
if trimmed := strings.TrimSpace(accountID); trimmed != "" {
setHeaderCasePreserved(headers, "ChatGPT-Account-ID", trimmed)
}
}
}
}
var attrs map[string]string
if auth != nil {
attrs = auth.Attributes
}
util.ApplyCustomHeadersFromAttrs(&http.Request{Header: headers}, attrs)
deleteDeprecatedCodexConversationHeader(headers)
return headers
}
@@ -993,10 +998,6 @@ func deleteHeaderCaseInsensitive(headers http.Header, key string) {
}
}
func deleteDeprecatedCodexConversationHeader(headers http.Header) {
deleteHeaderCaseInsensitive(headers, "Conversation_id")
}
func codexHeaderDefaults(cfg *config.Config, auth *cliproxyauth.Auth) (string, string) {
if cfg == nil || auth == nil {
return "", ""

View File

@@ -217,8 +217,11 @@ func TestApplyCodexWebsocketHeadersPassesThroughClientIdentityHeaders(t *testing
if got := headers.Get("X-Client-Request-Id"); got != "019d2233-e240-7162-992d-38df0a2a0e0d" {
t.Fatalf("X-Client-Request-Id = %s, want %s", got, "019d2233-e240-7162-992d-38df0a2a0e0d")
}
if got := headerValueCaseInsensitive(headers, "session_id"); got != "" {
t.Fatalf("session_id = %q, want empty", got)
if got := headerValueCaseInsensitive(headers, "session_id"); got != "legacy-session" {
t.Fatalf("session_id = %s, want legacy-session", got)
}
if _, ok := headers["session_id"]; !ok {
t.Fatalf("expected lowercase session_id header key, got %#v", headers)
}
}
@@ -341,16 +344,36 @@ func TestApplyCodexWebsocketHeadersPreservesExplicitAPIKeyUserAgent(t *testing.T
}
}
func TestApplyCodexPromptCacheHeadersDoesNotSetDeprecatedConversationHeader(t *testing.T) {
func TestApplyCodexWebsocketHeadersUsesCanonicalAccountHeader(t *testing.T) {
auth := &cliproxyauth.Auth{Provider: "codex", Metadata: map[string]any{"account_id": "acct-1"}}
headers := applyCodexWebsocketHeaders(context.Background(), http.Header{}, auth, "", nil)
if got := headerValueCaseInsensitive(headers, "ChatGPT-Account-ID"); got != "acct-1" {
t.Fatalf("ChatGPT-Account-ID = %s, want acct-1", got)
}
values, ok := headers["ChatGPT-Account-ID"]
if !ok {
t.Fatalf("expected exact ChatGPT-Account-ID key, got %#v", headers)
}
if len(values) != 1 || values[0] != "acct-1" {
t.Fatalf("ChatGPT-Account-ID values = %#v, want [acct-1]", values)
}
}
func TestApplyCodexPromptCacheHeadersSetsLowercaseSessionAndLegacyConversation(t *testing.T) {
req := cliproxyexecutor.Request{Model: "gpt-5-codex", Payload: []byte(`{"prompt_cache_key":"cache-1"}`)}
_, headers := applyCodexPromptCacheHeaders("openai-response", req, []byte(`{"model":"gpt-5-codex"}`))
if got := headerValueCaseInsensitive(headers, "session_id"); got != "" {
t.Fatalf("session_id = %q, want empty", got)
if got := headerValueCaseInsensitive(headers, "session_id"); got != "cache-1" {
t.Fatalf("session_id = %s, want cache-1", got)
}
if got := headers.Get("Conversation_id"); got != "" {
t.Fatalf("Conversation_id = %q, want empty", got)
if _, ok := headers["session_id"]; !ok {
t.Fatalf("expected lowercase session_id key, got %#v", headers)
}
if got := headers.Get("Conversation_id"); got != "cache-1" {
t.Fatalf("Conversation_id = %s, want cache-1", got)
}
}
@@ -379,8 +402,8 @@ func TestApplyCodexWebsocketHeadersIdentityConfuseRemapsPromptCacheKey(t *testin
if gotKey := gjson.GetBytes(body, "prompt_cache_key").String(); gotKey != expectedPromptCacheKey {
t.Fatalf("prompt_cache_key = %q, want %q", gotKey, expectedPromptCacheKey)
}
if gotSession := headerValueCaseInsensitive(headers, "session_id"); gotSession != "" {
t.Fatalf("session_id = %q, want empty", gotSession)
if gotSession := headerValueCaseInsensitive(headers, "session_id"); gotSession != expectedPromptCacheKey {
t.Fatalf("session_id = %q, want %q", gotSession, expectedPromptCacheKey)
}
if gotRequestID := headers.Get("X-Client-Request-Id"); gotRequestID != expectedPromptCacheKey {
t.Fatalf("X-Client-Request-Id = %q, want %q", gotRequestID, expectedPromptCacheKey)
@@ -388,8 +411,8 @@ func TestApplyCodexWebsocketHeadersIdentityConfuseRemapsPromptCacheKey(t *testin
if gotThreadID := headers.Get("Thread-Id"); gotThreadID != expectedPromptCacheKey {
t.Fatalf("Thread-Id = %q, want %q", gotThreadID, expectedPromptCacheKey)
}
if gotConversation := headers.Get("Conversation_id"); gotConversation != "" {
t.Fatalf("Conversation_id = %q, want empty", gotConversation)
if gotConversation := headers.Get("Conversation_id"); gotConversation != expectedPromptCacheKey {
t.Fatalf("Conversation_id = %q, want %q", gotConversation, expectedPromptCacheKey)
}
if gotWindowID := headers.Get("X-Codex-Window-Id"); gotWindowID != expectedPromptCacheKey+":0" {
t.Fatalf("X-Codex-Window-Id = %q, want %q", gotWindowID, expectedPromptCacheKey+":0")

View File

@@ -147,6 +147,12 @@ func websocketDownstreamSessionKey(req *http.Request) string {
return sessionID
}
}
if sessionID := strings.TrimSpace(req.Header.Get("Session-Id")); sessionID != "" {
return sessionID
}
if sessionID := strings.TrimSpace(req.Header.Get("Session_id")); sessionID != "" {
return sessionID
}
return ""
}

View File

@@ -471,11 +471,12 @@ func NewSessionAffinitySelectorWithConfig(cfg SessionAffinityConfig) *SessionAff
// Priority for session ID extraction:
// 1. metadata.user_id (Claude Code format with _session_{uuid}) - highest priority
// 2. X-Session-ID header
// 3. X-Amp-Thread-Id header (Amp CLI thread ID)
// 4. X-Client-Request-Id header (PI)
// 5. metadata.user_id (non-Claude Code format)
// 6. conversation_id field in request body
// 7. Stable hash from first few messages content (fallback)
// 3. Session_id header (Codex)
// 4. X-Amp-Thread-Id header (Amp CLI thread ID)
// 5. X-Client-Request-Id header (PI)
// 6. metadata.user_id (non-Claude Code format)
// 7. conversation_id field in request body
// 8. Stable hash from first few messages content (fallback)
//
// Note: The cache key includes provider, session ID, and model to handle cases where
// a session uses multiple models (e.g., gemini-2.5-pro and gemini-3-flash-preview)
@@ -572,11 +573,12 @@ func (s *SessionAffinitySelector) InvalidateAuth(authID string) {
// Priority order:
// 1. metadata.user_id (Claude Code format with _session_{uuid}) - highest priority for Claude Code clients
// 2. X-Session-ID header
// 3. X-Amp-Thread-Id header (Amp CLI thread ID)
// 4. X-Client-Request-Id header (PI)
// 5. metadata.user_id (non-Claude Code format)
// 6. conversation_id field in request body
// 7. Stable hash from first few messages content (fallback)
// 3. Session_id header (Codex)
// 4. X-Amp-Thread-Id header (Amp CLI thread ID)
// 5. X-Client-Request-Id header (PI)
// 6. metadata.user_id (non-Claude Code format)
// 7. conversation_id field in request body
// 8. Stable hash from first few messages content (fallback)
func ExtractSessionID(headers http.Header, payload []byte, metadata map[string]any) string {
primary, _ := extractSessionIDs(headers, payload, metadata)
return primary
@@ -612,14 +614,24 @@ func extractSessionIDs(headers http.Header, payload []byte, metadata map[string]
}
}
// 3. X-Amp-Thread-Id header (Amp CLI thread ID)
// 3. Session_id header (Codex)
if headers != nil {
if sid := headers.Get("Session-Id"); sid != "" {
return "codex:" + sid, ""
}
if sid := headers.Get("Session_id"); sid != "" {
return "codex:" + sid, ""
}
}
// 4. X-Amp-Thread-Id header (Amp CLI thread ID)
if headers != nil {
if tid := headers.Get("X-Amp-Thread-Id"); tid != "" {
return "amp:" + tid, ""
}
}
// 4. X-Client-Request-Id header (PI)
// 5. X-Client-Request-Id header (PI)
if headers != nil {
if rid := headers.Get("X-Client-Request-Id"); rid != "" {
return "clientreq:" + rid, ""
@@ -630,18 +642,18 @@ func extractSessionIDs(headers http.Header, payload []byte, metadata map[string]
return "", ""
}
// 5. metadata.user_id (non-Claude Code format)
// 6. metadata.user_id (non-Claude Code format)
userID := gjson.GetBytes(payload, "metadata.user_id").String()
if userID != "" {
return "user:" + userID, ""
}
// 6. conversation_id field
// 7. conversation_id field
if convID := gjson.GetBytes(payload, "conversation_id").String(); convID != "" {
return "conv:" + convID, ""
}
// 7. Hash-based fallback from message content
// 8. Hash-based fallback from message content
return extractMessageHashIDs(payload)
}

View File

@@ -776,15 +776,16 @@ func TestExtractSessionID_Headers(t *testing.T) {
}
}
func TestExtractSessionID_IgnoresCodexSessionIDHeader(t *testing.T) {
func TestExtractSessionID_CodexSessionIDHeader(t *testing.T) {
t.Parallel()
headers := make(http.Header)
headers.Set("Session_id", "codex-session-123")
got := ExtractSessionID(headers, nil, nil)
if got != "" {
t.Errorf("ExtractSessionID() with deprecated Session_id = %q, want empty", got)
want := "codex:codex-session-123"
if got != want {
t.Errorf("ExtractSessionID() with Session_id = %q, want %q", got, want)
}
}
@@ -801,7 +802,7 @@ func TestExtractSessionID_ClientRequestIDHeader(t *testing.T) {
}
}
func TestExtractSessionID_ClientRequestIDIgnoresDeprecatedCodexSessionID(t *testing.T) {
func TestExtractSessionID_CodexSessionIDPriorityOverClientRequestID(t *testing.T) {
t.Parallel()
headers := make(http.Header)
@@ -809,9 +810,9 @@ func TestExtractSessionID_ClientRequestIDIgnoresDeprecatedCodexSessionID(t *test
headers.Set("Session_id", "codex-session-456")
got := ExtractSessionID(headers, nil, nil)
want := "clientreq:pi-session-123"
want := "codex:codex-session-456"
if got != want {
t.Errorf("ExtractSessionID() = %q, want %q (deprecated Session_id should be ignored)", got, want)
t.Errorf("ExtractSessionID() = %q, want %q (Session_id should take priority over X-Client-Request-Id)", got, want)
}
}