Files
CLIProxyAPI/internal/cache/codex_reasoning_replay_cache.go
Luis Pater 2a050dc95d feat: enhance fault tolerance for kv-based caching and introduce additional tests
- Updated Antigravity Credits fallback to handle KV store unavailability as a service error.
- Enhanced signature caching mechanisms with request-time KV access and sliding expiration.
- Added and improved tests for KV client interactions, including error handling and expiration behaviors.
- Introduced `CacheSignatureBestEffort` for non-critical signature caching and clarified function flows with required context.
- Ensured consistent error reporting for missing or unavailable KV stores in various scenarios.
- Replaced direct `homekv` calls with injectable KV client interfaces for `antigravity` and `codex_reasoning_replay` modules.
- Improved error reporting and handling for KV operations, including `KVGet`, `KVSet`, `KVDel`, and `KVExpire`.
- Introduced dedicated fake KV clients for expanded and granular test coverage.
- Added new unit tests to validate KV client behaviors and error scenarios, ensuring robustness and sliding expiration functionality.
2026-06-14 21:11:35 +08:00

338 lines
12 KiB
Go

package cache
import (
"context"
"encoding/json"
"sort"
"strings"
"sync"
"time"
homekv "github.com/router-for-me/CLIProxyAPI/v7/internal/home"
"github.com/router-for-me/CLIProxyAPI/v7/internal/signature"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)
const (
// CodexReasoningReplayCacheTTL limits how long encrypted reasoning replay
// items stay in process memory.
CodexReasoningReplayCacheTTL = 1 * time.Hour
// CodexReasoningReplayCacheMaxEntries bounds process memory for replay
// continuity. Oldest entries are evicted first.
CodexReasoningReplayCacheMaxEntries = 10240
// CodexReasoningReplayCacheEvictBatchSize leaves headroom after the cache
// reaches capacity so high write volume does not rescan the map every turn.
CodexReasoningReplayCacheEvictBatchSize = 128
)
type codexReasoningReplayEntry struct {
Items [][]byte
Timestamp time.Time
}
var (
codexReasoningReplayMu sync.Mutex
codexReasoningReplayEntries = make(map[string]codexReasoningReplayEntry)
)
type codexReasoningReplayKVClient interface {
KVGet(ctx context.Context, key string) ([]byte, bool, error)
KVSet(ctx context.Context, key string, value []byte, opts homekv.KVSetOptions) (bool, error)
KVDel(ctx context.Context, keys ...string) (int64, error)
KVExpire(ctx context.Context, key string, ttl time.Duration) (bool, error)
}
var currentCodexReasoningReplayKVClient = func() (codexReasoningReplayKVClient, bool, error) {
return homekv.CurrentKVClient()
}
// CacheCodexReasoningReplayItem stores a final GPT/Codex reasoning item for
// stateless replay. The stored item is normalized to the minimal shape accepted
// by Responses input replay.
func CacheCodexReasoningReplayItem(modelName, sessionKey string, item []byte) bool {
return CacheCodexReasoningReplayItems(modelName, sessionKey, [][]byte{item})
}
// CacheCodexReasoningReplayItems stores the final GPT/Codex assistant output
// items needed to replay a stateless next turn.
func CacheCodexReasoningReplayItems(modelName, sessionKey string, items [][]byte) bool {
return CacheCodexReasoningReplayItemsBestEffort(context.Background(), modelName, sessionKey, items)
}
// CacheCodexReasoningReplayItemsBestEffort stores replay items for completed response paths.
func CacheCodexReasoningReplayItemsBestEffort(ctx context.Context, modelName, sessionKey string, items [][]byte) bool {
key := codexReasoningReplayCacheKey(modelName, sessionKey)
if key == "" {
return false
}
normalized, ok := normalizeCodexReasoningReplayItems(items)
if !ok {
return false
}
if client, homeMode, errClient := currentCodexReasoningReplayKVClient(); homeMode {
if errClient != nil {
log.Errorf("home kv best-effort codex reasoning replay set failed prefix=cpa:codex:*: %v", errClient)
return false
}
raw, errMarshal := json.Marshal(normalized)
if errMarshal != nil {
log.Errorf("home kv best-effort codex reasoning replay set failed prefix=cpa:codex:*: %v", errMarshal)
return false
}
written, errSet := client.KVSet(ctx, codexReasoningReplayKVKey(modelName, sessionKey), raw, homekv.KVSetOptions{EX: CodexReasoningReplayCacheTTL})
if errSet != nil {
log.Errorf("home kv best-effort codex reasoning replay set failed prefix=cpa:codex:*: %v", errSet)
return false
}
return written
}
cacheCleanupOnce.Do(startCacheCleanup)
now := time.Now()
codexReasoningReplayMu.Lock()
defer codexReasoningReplayMu.Unlock()
codexReasoningReplayEntries[key] = codexReasoningReplayEntry{
Items: normalized,
Timestamp: now,
}
if len(codexReasoningReplayEntries) > CodexReasoningReplayCacheMaxEntries {
evictOldestCodexReasoningReplayEntries(CodexReasoningReplayCacheEvictBatchSize)
}
return true
}
// GetCodexReasoningReplayItem retrieves a normalized reasoning replay item.
func GetCodexReasoningReplayItem(modelName, sessionKey string) ([]byte, bool) {
items, ok := GetCodexReasoningReplayItems(modelName, sessionKey)
if !ok || len(items) == 0 {
return nil, false
}
return items[0], true
}
// GetCodexReasoningReplayItems retrieves normalized assistant output items.
func GetCodexReasoningReplayItems(modelName, sessionKey string) ([][]byte, bool) {
items, ok, err := GetCodexReasoningReplayItemsRequired(context.Background(), modelName, sessionKey)
if err == nil {
return items, ok
}
return nil, false
}
// GetCodexReasoningReplayItemsRequired retrieves replay items for request-time paths.
func GetCodexReasoningReplayItemsRequired(ctx context.Context, modelName, sessionKey string) ([][]byte, bool, error) {
key := codexReasoningReplayCacheKey(modelName, sessionKey)
if key == "" {
return nil, false, nil
}
client, homeMode, errClient := currentCodexReasoningReplayKVClient()
if homeMode {
if errClient != nil {
return nil, false, errClient
}
raw, found, errGet := client.KVGet(ctx, codexReasoningReplayKVKey(modelName, sessionKey))
if errGet != nil || !found {
return nil, false, errGet
}
var homeItems [][]byte
if errUnmarshal := json.Unmarshal(raw, &homeItems); errUnmarshal != nil {
return nil, false, errUnmarshal
}
if _, errExpire := client.KVExpire(ctx, codexReasoningReplayKVKey(modelName, sessionKey), CodexReasoningReplayCacheTTL); errExpire != nil {
return nil, false, errExpire
}
return cloneCodexReasoningReplayItems(homeItems), true, nil
}
cacheCleanupOnce.Do(startCacheCleanup)
now := time.Now()
codexReasoningReplayMu.Lock()
defer codexReasoningReplayMu.Unlock()
entry, ok := codexReasoningReplayEntries[key]
if !ok {
return nil, false, nil
}
if now.Sub(entry.Timestamp) > CodexReasoningReplayCacheTTL {
delete(codexReasoningReplayEntries, key)
return nil, false, nil
}
entry.Timestamp = now
codexReasoningReplayEntries[key] = entry
return cloneCodexReasoningReplayItems(entry.Items), true, nil
}
// DeleteCodexReasoningReplayItem removes one replay item after upstream rejects
// it or the caller otherwise knows it is stale.
func DeleteCodexReasoningReplayItem(modelName, sessionKey string) {
if errDelete := DeleteCodexReasoningReplayItemRequired(context.Background(), modelName, sessionKey); errDelete != nil {
return
}
}
// DeleteCodexReasoningReplayItemRequired removes one replay item for request-time paths.
func DeleteCodexReasoningReplayItemRequired(ctx context.Context, modelName, sessionKey string) error {
key := codexReasoningReplayCacheKey(modelName, sessionKey)
if key == "" {
return nil
}
client, homeMode, errClient := currentCodexReasoningReplayKVClient()
if homeMode {
if errClient != nil {
return errClient
}
_, errDel := client.KVDel(ctx, codexReasoningReplayKVKey(modelName, sessionKey))
return errDel
}
codexReasoningReplayMu.Lock()
delete(codexReasoningReplayEntries, key)
codexReasoningReplayMu.Unlock()
return nil
}
// ClearCodexReasoningReplayCache clears all Codex reasoning replay state.
func ClearCodexReasoningReplayCache() {
codexReasoningReplayMu.Lock()
codexReasoningReplayEntries = make(map[string]codexReasoningReplayEntry)
codexReasoningReplayMu.Unlock()
}
func codexReasoningReplayCacheKey(modelName, sessionKey string) string {
modelName = strings.TrimSpace(modelName)
sessionKey = strings.TrimSpace(sessionKey)
if modelName == "" || sessionKey == "" {
return ""
}
// The session key is the continuity boundary. Keep this independent from
// the selected upstream Codex credential so auth failover can preserve replay.
return strings.Join([]string{"codex-reasoning-replay", modelName, sessionKey}, "\x00")
}
func codexReasoningReplayKVKey(modelName, sessionKey string) string {
return "cpa:codex:reasoning-replay:" + homekv.HashKeyPart(strings.TrimSpace(modelName)) + ":" + homekv.HashKeyPart(strings.TrimSpace(sessionKey))
}
func normalizeCodexReasoningReplayItems(items [][]byte) ([][]byte, bool) {
normalized := make([][]byte, 0, len(items))
for _, item := range items {
normalizedItem, ok := normalizeCodexReasoningReplayItem(item)
if ok {
normalized = append(normalized, normalizedItem)
}
}
return normalized, len(normalized) > 0
}
func normalizeCodexReasoningReplayItem(item []byte) ([]byte, bool) {
itemResult := gjson.ParseBytes(item)
switch strings.TrimSpace(itemResult.Get("type").String()) {
case "reasoning":
return normalizeCodexReasoningReplayReasoningItem(itemResult)
case "function_call":
return normalizeCodexReasoningReplayFunctionCallItem(itemResult)
case "custom_tool_call":
return normalizeCodexReasoningReplayCustomToolCallItem(itemResult)
default:
return nil, false
}
}
func normalizeCodexReasoningReplayReasoningItem(itemResult gjson.Result) ([]byte, bool) {
encryptedContentResult := itemResult.Get("encrypted_content")
if encryptedContentResult.Type != gjson.String {
return nil, false
}
encryptedContent := encryptedContentResult.String()
if encryptedContent != strings.TrimSpace(encryptedContent) {
return nil, false
}
if _, err := signature.InspectGPTReasoningSignature(encryptedContent); err != nil {
return nil, false
}
normalized := []byte(`{"type":"reasoning","summary":[],"content":null}`)
normalized, _ = sjson.SetBytes(normalized, "encrypted_content", encryptedContent)
return normalized, true
}
func normalizeCodexReasoningReplayFunctionCallItem(itemResult gjson.Result) ([]byte, bool) {
callID := strings.TrimSpace(itemResult.Get("call_id").String())
name := strings.TrimSpace(itemResult.Get("name").String())
arguments := itemResult.Get("arguments")
if callID == "" || name == "" || arguments.Type != gjson.String {
return nil, false
}
normalized := []byte(`{"type":"function_call"}`)
normalized, _ = sjson.SetBytes(normalized, "call_id", callID)
normalized, _ = sjson.SetBytes(normalized, "name", name)
normalized, _ = sjson.SetBytes(normalized, "arguments", arguments.String())
return normalized, true
}
func normalizeCodexReasoningReplayCustomToolCallItem(itemResult gjson.Result) ([]byte, bool) {
callID := strings.TrimSpace(itemResult.Get("call_id").String())
name := strings.TrimSpace(itemResult.Get("name").String())
input := itemResult.Get("input")
if callID == "" || name == "" || !input.Exists() {
return nil, false
}
normalized := []byte(`{"type":"custom_tool_call","status":"completed"}`)
if status := strings.TrimSpace(itemResult.Get("status").String()); status != "" {
normalized, _ = sjson.SetBytes(normalized, "status", status)
}
normalized, _ = sjson.SetBytes(normalized, "call_id", callID)
normalized, _ = sjson.SetBytes(normalized, "name", name)
if input.Type == gjson.String {
normalized, _ = sjson.SetBytes(normalized, "input", input.String())
} else {
normalized, _ = sjson.SetRawBytes(normalized, "input", []byte(input.Raw))
}
return normalized, true
}
func cloneCodexReasoningReplayItems(items [][]byte) [][]byte {
cloned := make([][]byte, 0, len(items))
for _, item := range items {
cloned = append(cloned, append([]byte(nil), item...))
}
return cloned
}
func evictOldestCodexReasoningReplayEntries(count int) {
if count <= 0 || len(codexReasoningReplayEntries) == 0 {
return
}
type candidate struct {
key string
timestamp time.Time
}
candidates := make([]candidate, 0, len(codexReasoningReplayEntries))
for key, entry := range codexReasoningReplayEntries {
candidates = append(candidates, candidate{key: key, timestamp: entry.Timestamp})
}
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].timestamp.Before(candidates[j].timestamp)
})
if count > len(candidates) {
count = len(candidates)
}
for i := 0; i < count; i++ {
delete(codexReasoningReplayEntries, candidates[i].key)
}
}
func purgeExpiredCodexReasoningReplayCache(now time.Time) {
codexReasoningReplayMu.Lock()
for key, entry := range codexReasoningReplayEntries {
if now.Sub(entry.Timestamp) > CodexReasoningReplayCacheTTL {
delete(codexReasoningReplayEntries, key)
}
}
codexReasoningReplayMu.Unlock()
}