Files
CLIProxyAPIPlus/sdk/cliproxy/auth/session_cache.go
sususu98 7c24d54ca8 feat(session-affinity): add session-sticky routing for multi-account load balancing
When multiple auth credentials are configured, requests from the same
session are now routed to the same credential, improving upstream prompt
cache hit rates and maintaining context continuity.

Core components:
- SessionAffinitySelector: wraps RoundRobin/FillFirst selectors with
  session-to-auth binding; automatic failover when bound auth is
  unavailable, re-binding via the fallback selector for even distribution
- SessionCache: TTL-based in-memory cache with background cleanup
  goroutine, supporting per-session and per-auth invalidation
- StoppableSelector interface: lifecycle hook for selectors holding
  resources, called during Manager.StopAutoRefresh()

Session ID extraction priority (extractSessionIDs):
1. metadata.user_id with Claude Code session format (old
   user_{hash}_session_{uuid} and new JSON {session_id} format)
2. X-Session-ID header (generic client support)
3. metadata.user_id (non-Claude format, used as-is)
4. conversation_id field
5. Stable FNV hash from system prompt + first user/assistant messages
   (fallback for clients with no explicit session ID); returns both a
   full hash (primaryID) and a short hash without assistant content
   (fallbackID) to inherit bindings from the first turn

Multi-format message hash covers OpenAI messages, Claude system array,
Gemini contents/systemInstruction, and OpenAI Responses API input items
(including inline messages with role but no type field).

Configuration (config.yaml routing section):
- session-affinity: bool (default false)
- session-affinity-ttl: duration string (default "1h")
- claude-code-session-affinity: bool (deprecated, alias for above)
All three fields trigger selector rebuild on config hot reload.

Side effect: Idempotency-Key header is no longer auto-generated with a
random UUID when absent — only forwarded when explicitly provided by the
client, to avoid polluting session hash extraction.
2026-04-16 00:18:47 +08:00

153 lines
3.1 KiB
Go

package auth
import (
"sync"
"time"
)
// sessionEntry stores auth binding with expiration.
type sessionEntry struct {
authID string
expiresAt time.Time
}
// SessionCache provides TTL-based session to auth mapping with automatic cleanup.
type SessionCache struct {
mu sync.RWMutex
entries map[string]sessionEntry
ttl time.Duration
stopCh chan struct{}
}
// NewSessionCache creates a cache with the specified TTL.
// A background goroutine periodically cleans expired entries.
func NewSessionCache(ttl time.Duration) *SessionCache {
if ttl <= 0 {
ttl = 30 * time.Minute
}
c := &SessionCache{
entries: make(map[string]sessionEntry),
ttl: ttl,
stopCh: make(chan struct{}),
}
go c.cleanupLoop()
return c
}
// Get retrieves the auth ID bound to a session, if still valid.
// Does NOT refresh the TTL on access.
func (c *SessionCache) Get(sessionID string) (string, bool) {
if sessionID == "" {
return "", false
}
c.mu.RLock()
entry, ok := c.entries[sessionID]
c.mu.RUnlock()
if !ok {
return "", false
}
if time.Now().After(entry.expiresAt) {
c.mu.Lock()
delete(c.entries, sessionID)
c.mu.Unlock()
return "", false
}
return entry.authID, true
}
// GetAndRefresh retrieves the auth ID bound to a session and refreshes TTL on hit.
// This extends the binding lifetime for active sessions.
func (c *SessionCache) GetAndRefresh(sessionID string) (string, bool) {
if sessionID == "" {
return "", false
}
now := time.Now()
c.mu.Lock()
entry, ok := c.entries[sessionID]
if !ok {
c.mu.Unlock()
return "", false
}
if now.After(entry.expiresAt) {
delete(c.entries, sessionID)
c.mu.Unlock()
return "", false
}
// Refresh TTL on successful access
entry.expiresAt = now.Add(c.ttl)
c.entries[sessionID] = entry
c.mu.Unlock()
return entry.authID, true
}
// Set binds a session to an auth ID with TTL refresh.
func (c *SessionCache) Set(sessionID, authID string) {
if sessionID == "" || authID == "" {
return
}
c.mu.Lock()
c.entries[sessionID] = sessionEntry{
authID: authID,
expiresAt: time.Now().Add(c.ttl),
}
c.mu.Unlock()
}
// Invalidate removes a specific session binding.
func (c *SessionCache) Invalidate(sessionID string) {
if sessionID == "" {
return
}
c.mu.Lock()
delete(c.entries, sessionID)
c.mu.Unlock()
}
// InvalidateAuth removes all sessions bound to a specific auth ID.
// Used when an auth becomes unavailable.
func (c *SessionCache) InvalidateAuth(authID string) {
if authID == "" {
return
}
c.mu.Lock()
for sid, entry := range c.entries {
if entry.authID == authID {
delete(c.entries, sid)
}
}
c.mu.Unlock()
}
// Stop terminates the background cleanup goroutine.
func (c *SessionCache) Stop() {
select {
case <-c.stopCh:
default:
close(c.stopCh)
}
}
func (c *SessionCache) cleanupLoop() {
ticker := time.NewTicker(c.ttl / 2)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
c.cleanup()
}
}
}
func (c *SessionCache) cleanup() {
now := time.Now()
c.mu.Lock()
for sid, entry := range c.entries {
if now.After(entry.expiresAt) {
delete(c.entries, sid)
}
}
c.mu.Unlock()
}