diff --git a/.gitignore b/.gitignore index 9824a36d8..728fa9595 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ static/* # Authentication data auths/* +/auths !auths/.gitkeep # Documentation @@ -38,6 +39,7 @@ GEMINI.md .worktrees/ .codex/* .claude/* +.claude .gemini/* .serena/* .agent/* diff --git a/internal/cache/antigravity_reasoning_replay_cache.go b/internal/cache/antigravity_reasoning_replay_cache.go new file mode 100644 index 000000000..a9f58c28d --- /dev/null +++ b/internal/cache/antigravity_reasoning_replay_cache.go @@ -0,0 +1,347 @@ +package cache + +import ( + "context" + "encoding/json" + "sort" + "strings" + "sync" + "time" + + homekv "github.com/router-for-me/CLIProxyAPI/v7/internal/home" + log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" +) + +const ( + // AntigravityReasoningReplayCacheTTL limits how long encrypted reasoning replay + // items stay in process memory. + AntigravityReasoningReplayCacheTTL = 1 * time.Hour + + // AntigravityReasoningReplayCacheMaxEntries bounds process memory for replay + // continuity. Oldest entries are evicted first. + AntigravityReasoningReplayCacheMaxEntries = 10240 + + // AntigravityReasoningReplayCacheEvictBatchSize leaves headroom after the cache + // reaches capacity so high write volume does not rescan the map every turn. + AntigravityReasoningReplayCacheEvictBatchSize = 128 + + minAntigravityThoughtSignatureReplayLen = 16 +) + +type antigravityReasoningReplayEntry struct { + Items [][]byte + Timestamp time.Time +} + +var ( + antigravityReasoningReplayMu sync.Mutex + antigravityReasoningReplayEntries = make(map[string]antigravityReasoningReplayEntry) +) + +type antigravityReasoningReplayKVClient interface { + KVGet(ctx context.Context, key string) ([]byte, bool, error) + KVSet(ctx context.Context, key string, value []byte, opts homekv.KVSetOptions) (bool, error) + KVDel(ctx context.Context, keys ...string) (int64, error) + KVExpire(ctx context.Context, key string, ttl time.Duration) (bool, error) +} + +var currentAntigravityReasoningReplayKVClient = func() (antigravityReasoningReplayKVClient, bool, error) { + return homekv.CurrentKVClient() +} + +// CacheAntigravityReasoningReplayItem stores a final GPT/Codex reasoning item for +// stateless replay. The stored item is normalized to the minimal shape accepted +// by Responses input replay. +func CacheAntigravityReasoningReplayItem(modelName, sessionKey string, item []byte) bool { + return CacheAntigravityReasoningReplayItems(modelName, sessionKey, [][]byte{item}) +} + +// CacheAntigravityReasoningReplayItems stores the final GPT/Codex assistant output +// items needed to replay a stateless next turn. +func CacheAntigravityReasoningReplayItems(modelName, sessionKey string, items [][]byte) bool { + return CacheAntigravityReasoningReplayItemsBestEffort(context.Background(), modelName, sessionKey, items) +} + +// CacheAntigravityReasoningReplayItemsBestEffort stores replay items for completed response paths. +func CacheAntigravityReasoningReplayItemsBestEffort(ctx context.Context, modelName, sessionKey string, items [][]byte) bool { + key := antigravityReasoningReplayCacheKey(modelName, sessionKey) + if key == "" { + return false + } + normalized, ok := normalizeAntigravityReasoningReplayItems(items) + if !ok { + return false + } + if client, homeMode, errClient := currentAntigravityReasoningReplayKVClient(); homeMode { + if errClient != nil { + log.Errorf("home kv best-effort antigravity reasoning replay set failed prefix=cpa:antigravity:*: %v", errClient) + return false + } + raw, errMarshal := json.Marshal(normalized) + if errMarshal != nil { + log.Errorf("home kv best-effort antigravity reasoning replay set failed prefix=cpa:antigravity:*: %v", errMarshal) + return false + } + written, errSet := client.KVSet(ctx, antigravityReasoningReplayKVKey(modelName, sessionKey), raw, homekv.KVSetOptions{EX: AntigravityReasoningReplayCacheTTL}) + if errSet != nil { + log.Errorf("home kv best-effort antigravity reasoning replay set failed prefix=cpa:antigravity:*: %v", errSet) + return false + } + return written + } + + cacheCleanupOnce.Do(startCacheCleanup) + now := time.Now() + antigravityReasoningReplayMu.Lock() + defer antigravityReasoningReplayMu.Unlock() + antigravityReasoningReplayEntries[key] = antigravityReasoningReplayEntry{ + Items: normalized, + Timestamp: now, + } + if len(antigravityReasoningReplayEntries) > AntigravityReasoningReplayCacheMaxEntries { + evictOldestAntigravityReasoningReplayEntries(AntigravityReasoningReplayCacheEvictBatchSize) + } + return true +} + +// GetAntigravityReasoningReplayItem retrieves a normalized reasoning replay item. +func GetAntigravityReasoningReplayItem(modelName, sessionKey string) ([]byte, bool) { + items, ok := GetAntigravityReasoningReplayItems(modelName, sessionKey) + if !ok || len(items) == 0 { + return nil, false + } + return items[0], true +} + +// GetAntigravityReasoningReplayItems retrieves normalized assistant output items. +func GetAntigravityReasoningReplayItems(modelName, sessionKey string) ([][]byte, bool) { + items, ok, err := GetAntigravityReasoningReplayItemsRequired(context.Background(), modelName, sessionKey) + if err == nil { + return items, ok + } + return nil, false +} + +// GetAntigravityReasoningReplayItemsRequired retrieves replay items for request-time paths. +func GetAntigravityReasoningReplayItemsRequired(ctx context.Context, modelName, sessionKey string) ([][]byte, bool, error) { + key := antigravityReasoningReplayCacheKey(modelName, sessionKey) + if key == "" { + return nil, false, nil + } + client, homeMode, errClient := currentAntigravityReasoningReplayKVClient() + if homeMode { + if errClient != nil { + return nil, false, errClient + } + raw, found, errGet := client.KVGet(ctx, antigravityReasoningReplayKVKey(modelName, sessionKey)) + if errGet != nil || !found { + return nil, false, errGet + } + var homeItems [][]byte + if errUnmarshal := json.Unmarshal(raw, &homeItems); errUnmarshal != nil { + return nil, false, errUnmarshal + } + if _, errExpire := client.KVExpire(ctx, antigravityReasoningReplayKVKey(modelName, sessionKey), AntigravityReasoningReplayCacheTTL); errExpire != nil { + return nil, false, errExpire + } + return cloneAntigravityReasoningReplayItems(homeItems), true, nil + } + + cacheCleanupOnce.Do(startCacheCleanup) + now := time.Now() + antigravityReasoningReplayMu.Lock() + defer antigravityReasoningReplayMu.Unlock() + entry, ok := antigravityReasoningReplayEntries[key] + if !ok { + return nil, false, nil + } + if now.Sub(entry.Timestamp) > AntigravityReasoningReplayCacheTTL { + delete(antigravityReasoningReplayEntries, key) + return nil, false, nil + } + entry.Timestamp = now + antigravityReasoningReplayEntries[key] = entry + return cloneAntigravityReasoningReplayItems(entry.Items), true, nil +} + +// DeleteAntigravityReasoningReplayItem removes one replay item after upstream rejects +// it or the caller otherwise knows it is stale. +func DeleteAntigravityReasoningReplayItem(modelName, sessionKey string) { + if errDelete := DeleteAntigravityReasoningReplayItemRequired(context.Background(), modelName, sessionKey); errDelete != nil { + return + } +} + +// DeleteAntigravityReasoningReplayItemRequired removes one replay item for request-time paths. +func DeleteAntigravityReasoningReplayItemRequired(ctx context.Context, modelName, sessionKey string) error { + key := antigravityReasoningReplayCacheKey(modelName, sessionKey) + if key == "" { + return nil + } + client, homeMode, errClient := currentAntigravityReasoningReplayKVClient() + if homeMode { + if errClient != nil { + return errClient + } + _, errDel := client.KVDel(ctx, antigravityReasoningReplayKVKey(modelName, sessionKey)) + return errDel + } + antigravityReasoningReplayMu.Lock() + delete(antigravityReasoningReplayEntries, key) + antigravityReasoningReplayMu.Unlock() + return nil +} + +// ClearAntigravityReasoningReplayCache clears all Antigravity reasoning replay state. +func ClearAntigravityReasoningReplayCache() { + antigravityReasoningReplayMu.Lock() + antigravityReasoningReplayEntries = make(map[string]antigravityReasoningReplayEntry) + antigravityReasoningReplayMu.Unlock() +} + +func antigravityReasoningReplayCacheKey(modelName, sessionKey string) string { + modelName = strings.TrimSpace(modelName) + sessionKey = strings.TrimSpace(sessionKey) + if modelName == "" || sessionKey == "" { + return "" + } + // The session key is the continuity boundary. Keep this independent from + // the selected upstream Codex credential so auth failover can preserve replay. + return strings.Join([]string{"antigravity-reasoning-replay", modelName, sessionKey}, "\x00") +} + +func antigravityReasoningReplayKVKey(modelName, sessionKey string) string { + return "cpa:antigravity:reasoning-replay:" + homekv.HashKeyPart(strings.TrimSpace(modelName)) + ":" + homekv.HashKeyPart(strings.TrimSpace(sessionKey)) +} + +func normalizeAntigravityReasoningReplayItems(items [][]byte) ([][]byte, bool) { + normalized := make([][]byte, 0, len(items)) + for _, item := range items { + normalizedItem, ok := normalizeAntigravityReasoningReplayItem(item) + if ok { + normalized = append(normalized, normalizedItem) + } + } + return normalized, len(normalized) > 0 +} + +func normalizeAntigravityReasoningReplayItem(item []byte) ([]byte, bool) { + itemResult := gjson.ParseBytes(item) + switch strings.TrimSpace(itemResult.Get("type").String()) { + case "thought_signature": + return normalizeAntigravityThoughtSignatureReplayItem(itemResult) + case "function_call_part": + return normalizeAntigravityFunctionCallPartReplayItem(itemResult) + default: + return nil, false + } +} + +func normalizeAntigravityThoughtSignatureReplayItem(itemResult gjson.Result) ([]byte, bool) { + sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String()) + if sig == "" { + sig = strings.TrimSpace(itemResult.Get("thought_signature").String()) + } + if sig == "" || len(sig) < minAntigravityThoughtSignatureReplayLen { + return nil, false + } + normalized := []byte(`{"type":"thought_signature"}`) + normalized, _ = sjson.SetBytes(normalized, "thoughtSignature", sig) + if contentIndex := itemResult.Get("contentIndex"); contentIndex.Type == gjson.Number { + normalized, _ = sjson.SetBytes(normalized, "contentIndex", contentIndex.Int()) + } + if partIndex := itemResult.Get("partIndex"); partIndex.Type == gjson.Number { + normalized, _ = sjson.SetBytes(normalized, "partIndex", partIndex.Int()) + } + return normalized, true +} + +func normalizeAntigravityFunctionCallPartReplayItem(itemResult gjson.Result) ([]byte, bool) { + callID := strings.TrimSpace(itemResult.Get("call_id").String()) + if callID == "" { + callID = strings.TrimSpace(itemResult.Get("id").String()) + } + name := strings.TrimSpace(itemResult.Get("name").String()) + args := itemResult.Get("args") + if name == "" || !args.Exists() { + fc := itemResult.Get("functionCall") + if fc.Exists() { + if callID == "" { + callID = strings.TrimSpace(fc.Get("id").String()) + } + if name == "" { + name = strings.TrimSpace(fc.Get("name").String()) + } + if !args.Exists() { + args = fc.Get("args") + } + } + } + if name == "" || !args.Exists() { + return nil, false + } + normalized := []byte(`{"type":"function_call_part"}`) + if callID != "" { + normalized, _ = sjson.SetBytes(normalized, "call_id", callID) + } + normalized, _ = sjson.SetBytes(normalized, "name", name) + if args.Type == gjson.String { + normalized, _ = sjson.SetBytes(normalized, "args", args.String()) + } else { + normalized, _ = sjson.SetRawBytes(normalized, "args", []byte(args.Raw)) + } + sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String()) + if sig != "" { + normalized, _ = sjson.SetBytes(normalized, "thoughtSignature", sig) + } + if contentIndex := itemResult.Get("contentIndex"); contentIndex.Type == gjson.Number { + normalized, _ = sjson.SetBytes(normalized, "contentIndex", contentIndex.Int()) + } + if partIndex := itemResult.Get("partIndex"); partIndex.Type == gjson.Number { + normalized, _ = sjson.SetBytes(normalized, "partIndex", partIndex.Int()) + } + return normalized, true +} + +func cloneAntigravityReasoningReplayItems(items [][]byte) [][]byte { + cloned := make([][]byte, 0, len(items)) + for _, item := range items { + cloned = append(cloned, append([]byte(nil), item...)) + } + return cloned +} + +func evictOldestAntigravityReasoningReplayEntries(count int) { + if count <= 0 || len(antigravityReasoningReplayEntries) == 0 { + return + } + type candidate struct { + key string + timestamp time.Time + } + candidates := make([]candidate, 0, len(antigravityReasoningReplayEntries)) + for key, entry := range antigravityReasoningReplayEntries { + candidates = append(candidates, candidate{key: key, timestamp: entry.Timestamp}) + } + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].timestamp.Before(candidates[j].timestamp) + }) + if count > len(candidates) { + count = len(candidates) + } + for i := 0; i < count; i++ { + delete(antigravityReasoningReplayEntries, candidates[i].key) + } +} + +func purgeExpiredAntigravityReasoningReplayCache(now time.Time) { + antigravityReasoningReplayMu.Lock() + for key, entry := range antigravityReasoningReplayEntries { + if now.Sub(entry.Timestamp) > AntigravityReasoningReplayCacheTTL { + delete(antigravityReasoningReplayEntries, key) + } + } + antigravityReasoningReplayMu.Unlock() +} diff --git a/internal/cache/signature_cache.go b/internal/cache/signature_cache.go index 1f54458e4..72c3ddebc 100644 --- a/internal/cache/signature_cache.go +++ b/internal/cache/signature_cache.go @@ -109,6 +109,7 @@ func purgeExpiredCaches() { return true }) purgeExpiredCodexReasoningReplayCache(now) + purgeExpiredAntigravityReasoningReplayCache(now) } // CacheSignature stores a thinking signature for a given model group and text. diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index 6fd1146d2..2f113a46b 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -306,9 +306,6 @@ func validateAntigravityRequestSignatures(ctx context.Context, modelName string, rawJSON = antigravityclaude.StripEmptySignatureThinkingBlocks(rawJSON) logAntigravitySignatureStrip(before, countClaudeThinkingBlocks(rawJSON), "prefix_cleanup", "empty_or_non_claude_signature") if cache.SignatureCacheEnabled() { - if errRequire := antigravityclaude.RequireCachedThinkingSignatures(ctx, modelName, rawJSON); errRequire != nil { - return nil, homeKVUnavailableStatusErr(errRequire) - } return rawJSON, nil } if !cache.SignatureBypassStrictMode() { @@ -691,6 +688,15 @@ attemptLoop: helps.MarkCreditsUsed(ctx) } } + replayScope := antigravityReasoningReplayScope{} + if antigravityUsesReasoningReplayCache(baseModel) { + var errReplay error + requestPayload, replayScope, errReplay = prepareAntigravityGeminiReasoningReplayPayload(ctx, baseModel, req, opts, requestPayload) + if errReplay != nil { + err = errReplay + return resp, err + } + } httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, requestPayload, false, opts.Alt, baseURL) if errReq != nil { @@ -798,6 +804,10 @@ attemptLoop: continue attemptLoop } } + if errClear := clearAntigravityReasoningReplayOnInvalidSignature(ctx, replayScope, httpResp.StatusCode, bodyBytes); errClear != nil { + err = errClear + return resp, err + } err = newAntigravityStatusErr(httpResp.StatusCode, bodyBytes) return resp, err } @@ -806,6 +816,7 @@ attemptLoop: if useCredits { clearAntigravityCreditsFailureState(auth) } + cacheAntigravityReasoningReplayFromResponse(ctx, replayScope, requestPayload, bodyBytes) bodyBytes = e.resolveWebSearchGroundingURLs(ctx, auth, from, originalPayload, translated, bodyBytes) reporter.Publish(ctx, helps.ParseAntigravityUsage(bodyBytes)) var param any @@ -1369,6 +1380,15 @@ attemptLoop: helps.MarkCreditsUsed(ctx) } } + replayScope := antigravityReasoningReplayScope{} + if antigravityUsesReasoningReplayCache(baseModel) { + var errReplay error + requestPayload, replayScope, errReplay = prepareAntigravityGeminiReasoningReplayPayload(ctx, baseModel, req, opts, requestPayload) + if errReplay != nil { + err = errReplay + return nil, err + } + } httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, requestPayload, true, opts.Alt, baseURL) if errReq != nil { err = errReq @@ -1487,6 +1507,10 @@ attemptLoop: continue attemptLoop } } + if errClear := clearAntigravityReasoningReplayOnInvalidSignature(ctx, replayScope, httpResp.StatusCode, bodyBytes); errClear != nil { + err = errClear + return nil, err + } err = newAntigravityStatusErr(httpResp.StatusCode, bodyBytes) return nil, err } @@ -1495,12 +1519,16 @@ attemptLoop: if useCredits { clearAntigravityCreditsFailureState(auth) } + replayAccumulator := newAntigravityReasoningReplayAccumulator(replayScope, requestPayload) out := make(chan cliproxyexecutor.StreamChunk) go func(resp *http.Response) { defer close(out) defer func() { + if replayAccumulator != nil { + replayAccumulator.Flush(ctx) + } if errClose := resp.Body.Close(); errClose != nil { - log.Errorf("antigravity executor: close response body error: %v", errClose) + log.Errorf("antigravity executor: close response line error: %v", errClose) } }() scanner := bufio.NewScanner(resp.Body) @@ -1509,6 +1537,9 @@ attemptLoop: for scanner.Scan() { line := scanner.Bytes() helps.AppendAPIResponseChunk(ctx, e.cfg, line) + if replayAccumulator != nil { + replayAccumulator.ObserveSSELine(line) + } // Filter usage metadata for all models // Only retain usage statistics in the terminal chunk @@ -2229,9 +2260,10 @@ func (e *AntigravityExecutor) buildRequest(ctx context.Context, auth *cliproxyau payloadStr, _ = sjson.Delete(payloadStr, "request.generationConfig.maxOutputTokens") } - bodyReader = strings.NewReader(payloadStr) + payloadStrBytes := applyAntigravityNativeSignatureReplayIfNeeded(modelName, []byte(payloadStr)) + bodyReader = bytes.NewReader(payloadStrBytes) if e.cfg != nil && e.cfg.RequestLog { - payloadLog = []byte(payloadStr) + payloadLog = append([]byte(nil), payloadStrBytes...) } } else { if strings.Contains(modelName, "claude") { @@ -2240,6 +2272,7 @@ func (e *AntigravityExecutor) buildRequest(ctx context.Context, auth *cliproxyau payload, _ = sjson.DeleteBytes(payload, "request.generationConfig.maxOutputTokens") } + payload = applyAntigravityNativeSignatureReplayIfNeeded(modelName, payload) bodyReader = bytes.NewReader(payload) if e.cfg != nil && e.cfg.RequestLog { payloadLog = append([]byte(nil), payload...) diff --git a/internal/runtime/executor/antigravity_reasoning_replay.go b/internal/runtime/executor/antigravity_reasoning_replay.go new file mode 100644 index 000000000..b7d591467 --- /dev/null +++ b/internal/runtime/executor/antigravity_reasoning_replay.go @@ -0,0 +1,477 @@ +package executor + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "net/http" + "strings" + + internalcache "github.com/router-for-me/CLIProxyAPI/v7/internal/cache" + "github.com/router-for-me/CLIProxyAPI/v7/internal/runtime/executor/helps" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/executor" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" +) + +type antigravityReasoningReplayScope struct { + modelName string + sessionKey string +} + +func (s antigravityReasoningReplayScope) valid() bool { + return strings.TrimSpace(s.modelName) != "" && strings.TrimSpace(s.sessionKey) != "" +} + +func antigravityReasoningReplayScopeFromPayload(modelName string, payload []byte) antigravityReasoningReplayScope { + sessionID := antigravityReplaySessionIDFromPayload(payload) + if sessionID == "" { + return antigravityReasoningReplayScope{} + } + return antigravityReasoningReplayScope{ + modelName: strings.TrimSpace(modelName), + sessionKey: "session:" + sessionID, + } +} + +func antigravityReasoningReplayScopeFromRequest(ctx context.Context, modelName string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, payload []byte) antigravityReasoningReplayScope { + if scope := antigravityReasoningReplayScopeFromPayload(modelName, payload); scope.valid() { + return scope + } + if scope := antigravityReasoningReplayScopeFromPayload(modelName, req.Payload); scope.valid() { + return scope + } + if value := metadataString(opts.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); value != "" { + return antigravityReasoningReplayScope{modelName: modelName, sessionKey: "execution:" + value} + } + if value := metadataString(req.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); value != "" { + return antigravityReasoningReplayScope{modelName: modelName, sessionKey: "execution:" + value} + } + _ = ctx + return antigravityReasoningReplayScope{} +} + +func antigravityReplaySessionIDFromPayload(payload []byte) string { + if len(payload) == 0 { + return "" + } + for _, path := range []string{"sessionId", "session_id", "request.sessionId", "request.session_id"} { + if id := strings.TrimSpace(gjson.GetBytes(payload, path).String()); id != "" { + return id + } + } + return "" +} + +func antigravityReasoningReplayPendingModelContentIndex(payload []byte) (contentIndex int, basePartIndex int) { + contents := gjson.GetBytes(payload, "request.contents") + if !contents.IsArray() { + return 0, 0 + } + arr := contents.Array() + if len(arr) == 0 { + return 0, 0 + } + last := arr[len(arr)-1] + if strings.EqualFold(strings.TrimSpace(last.Get("role").String()), "model") { + ci := len(arr) - 1 + parts := last.Get("parts") + base := 0 + if parts.IsArray() { + base = len(parts.Array()) + } + return ci, base + } + return len(arr), 0 +} + +func antigravityReasoningReplayResolveContentIndex(payload []byte, cached int) int { + contents := gjson.GetBytes(payload, "request.contents") + if !contents.IsArray() { + return cached + } + arr := contents.Array() + if cached >= 0 && cached < len(arr) { + return cached + } + for i := len(arr) - 1; i >= 0; i-- { + if strings.EqualFold(strings.TrimSpace(arr[i].Get("role").String()), "model") { + return i + } + } + if len(arr) == 0 { + return 0 + } + return len(arr) - 1 +} + +func prepareAntigravityGeminiReasoningReplayPayload(ctx context.Context, modelName string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, payload []byte) ([]byte, antigravityReasoningReplayScope, error) { + if !antigravityUsesReasoningReplayCache(modelName) { + return payload, antigravityReasoningReplayScope{}, nil + } + return applyAntigravityReasoningReplayCache(ctx, modelName, req, opts, payload) +} + +func clearAntigravityReasoningReplayOnInvalidSignature(ctx context.Context, scope antigravityReasoningReplayScope, statusCode int, body []byte) error { + if !scope.valid() { + return nil + } + if statusCode != http.StatusBadRequest { + return nil + } + bodyText := strings.ToLower(string(body)) + if !strings.Contains(bodyText, "thoughtsignature") && !strings.Contains(bodyText, "thought_signature") && !strings.Contains(bodyText, "signature") { + return nil + } + return internalcache.DeleteAntigravityReasoningReplayItemRequired(ctx, scope.modelName, scope.sessionKey) +} + +func applyAntigravityReasoningReplayCache(ctx context.Context, modelName string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, payload []byte) ([]byte, antigravityReasoningReplayScope, error) { + scope := antigravityReasoningReplayScopeFromRequest(ctx, modelName, req, opts, payload) + if !scope.valid() { + return payload, scope, nil + } + items, ok, err := internalcache.GetAntigravityReasoningReplayItemsRequired(ctx, scope.modelName, scope.sessionKey) + if err != nil || !ok || len(items) == 0 { + return payload, scope, err + } + items = filterAntigravityReasoningReplayItemsForRequest(payload, items) + if len(items) == 0 { + return payload, scope, nil + } + updated, okApply := insertAntigravityReasoningReplayItems(payload, items) + if !okApply { + return payload, scope, nil + } + return updated, scope, nil +} + +func filterAntigravityReasoningReplayItemsForRequest(payload []byte, items [][]byte) [][]byte { + existing := antigravityExistingToolCallKeys(payload) + filtered := make([][]byte, 0, len(items)) + for _, item := range items { + itemResult := gjson.ParseBytes(item) + switch strings.TrimSpace(itemResult.Get("type").String()) { + case "function_call_part": + keys := antigravityReplayToolCallKeys(itemResult) + if len(keys) == 0 || antigravityAnyKeyExists(existing, keys) { + continue + } + if !antigravityRequestHasMatchingFunctionResponse(payload, itemResult) { + continue + } + case "thought_signature": + if antigravityRequestHasThoughtSignatureAt(payload, itemResult) { + continue + } + default: + continue + } + filtered = append(filtered, item) + } + return filtered +} + +func antigravityExistingToolCallKeys(payload []byte) map[string]bool { + existing := make(map[string]bool) + contents := gjson.GetBytes(payload, "request.contents") + if !contents.IsArray() { + return existing + } + for _, content := range contents.Array() { + parts := content.Get("parts") + if !parts.IsArray() { + continue + } + for _, part := range parts.Array() { + if fc := part.Get("functionCall"); fc.Exists() { + for _, key := range antigravityReplayToolCallKeysFromPart(fc) { + existing[key] = true + } + } + } + } + return existing +} + +func antigravityReplayToolCallKeys(itemResult gjson.Result) []string { + callID := strings.TrimSpace(itemResult.Get("call_id").String()) + name := strings.TrimSpace(itemResult.Get("name").String()) + if name == "" { + return nil + } + args := itemResult.Get("args").Raw + key := antigravityFunctionCallKey(name, args, callID) + if key == "" { + return nil + } + return []string{key} +} + +func antigravityReplayToolCallKeysFromPart(fc gjson.Result) []string { + return antigravityReplayToolCallKeys(gjson.Parse(fc.Raw)) +} + +func antigravityFunctionCallKey(name, argsRaw, callID string) string { + name = strings.TrimSpace(name) + if name == "" { + return "" + } + h := sha256.Sum256([]byte(strings.Join([]string{name, argsRaw, callID}, "\x00"))) + return fmt.Sprintf("fc:%x", h[:8]) +} + +func antigravityAnyKeyExists(existing map[string]bool, keys []string) bool { + for _, key := range keys { + if existing[key] { + return true + } + } + return false +} + +func antigravityRequestHasMatchingFunctionResponse(payload []byte, itemResult gjson.Result) bool { + callID := strings.TrimSpace(itemResult.Get("call_id").String()) + if callID == "" { + return true + } + contents := gjson.GetBytes(payload, "request.contents") + if !contents.IsArray() { + return false + } + for _, content := range contents.Array() { + parts := content.Get("parts") + if !parts.IsArray() { + continue + } + for _, part := range parts.Array() { + fr := part.Get("functionResponse") + if fr.Exists() && strings.TrimSpace(fr.Get("id").String()) == callID { + return true + } + } + } + return false +} + +func antigravityRequestHasThoughtSignatureAt(payload []byte, itemResult gjson.Result) bool { + ci := int(itemResult.Get("contentIndex").Int()) + pi := int(itemResult.Get("partIndex").Int()) + path := fmt.Sprintf("request.contents.%d.parts.%d.thoughtSignature", ci, pi) + return strings.TrimSpace(gjson.GetBytes(payload, path).String()) != "" +} + +func insertAntigravityReasoningReplayItems(payload []byte, items [][]byte) ([]byte, bool) { + out := payload + changed := false + for _, item := range items { + itemResult := gjson.ParseBytes(item) + switch strings.TrimSpace(itemResult.Get("type").String()) { + case "thought_signature": + ci := antigravityReasoningReplayResolveContentIndex(out, int(itemResult.Get("contentIndex").Int())) + pi := int(itemResult.Get("partIndex").Int()) + sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String()) + if sig == "" { + continue + } + path := fmt.Sprintf("request.contents.%d.parts.%d.thoughtSignature", ci, pi) + if strings.TrimSpace(gjson.GetBytes(out, path).String()) != "" { + continue + } + updated, err := sjson.SetBytes(out, path, sig) + if err != nil { + continue + } + out = updated + changed = true + case "function_call_part": + updated, ok := mergeAntigravityFunctionCallPartReplay(out, itemResult) + if ok { + out = updated + changed = true + } + } + } + return out, changed +} + +func mergeAntigravityFunctionCallPartReplay(payload []byte, itemResult gjson.Result) ([]byte, bool) { + ci := antigravityReasoningReplayResolveContentIndex(payload, int(itemResult.Get("contentIndex").Int())) + pi := int(itemResult.Get("partIndex").Int()) + sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String()) + pathSig := fmt.Sprintf("request.contents.%d.parts.%d.thoughtSignature", ci, pi) + out := payload + changed := false + if sig != "" && strings.TrimSpace(gjson.GetBytes(out, pathSig).String()) == "" { + if updated, err := sjson.SetBytes(out, pathSig, sig); err == nil { + out = updated + changed = true + } + } + name := strings.TrimSpace(itemResult.Get("name").String()) + args := itemResult.Get("args") + callID := strings.TrimSpace(itemResult.Get("call_id").String()) + pathFC := fmt.Sprintf("request.contents.%d.parts.%d.functionCall", ci, pi) + if !gjson.GetBytes(out, pathFC).Exists() && name != "" && args.Exists() { + fc := map[string]any{"name": name} + if callID != "" { + fc["id"] = callID + } + if args.Type == gjson.String { + fc["args"] = args.String() + } else { + var parsed any + if json.Unmarshal([]byte(args.Raw), &parsed) == nil { + fc["args"] = parsed + } + } + if updated, err := sjson.SetBytes(out, pathFC, fc); err == nil { + out = updated + changed = true + } + } + return out, changed +} + +type antigravityReasoningReplayAccumulator struct { + scope antigravityReasoningReplayScope + requestPayload []byte + items [][]byte + seenFC map[string]bool + contentIndex int + nextPartIndex int +} + +func newAntigravityReasoningReplayAccumulator(scope antigravityReasoningReplayScope, requestPayload []byte) *antigravityReasoningReplayAccumulator { + if !scope.valid() { + return nil + } + contentIndex, basePartIndex := antigravityReasoningReplayPendingModelContentIndex(requestPayload) + return &antigravityReasoningReplayAccumulator{ + scope: scope, + requestPayload: append([]byte(nil), requestPayload...), + seenFC: make(map[string]bool), + contentIndex: contentIndex, + nextPartIndex: basePartIndex, + } +} + +func (a *antigravityReasoningReplayAccumulator) ObserveSSELine(line []byte) { + if a == nil { + return + } + payload := helps.JSONPayload(line) + if payload == nil { + return + } + a.observeResponsePayload(payload) +} + +func (a *antigravityReasoningReplayAccumulator) observeResponsePayload(payload []byte) { + parts := gjson.GetBytes(payload, "response.candidates.0.content.parts") + if !parts.IsArray() { + return + } + parts.ForEach(func(_, part gjson.Result) bool { + pi := a.nextPartIndex + a.nextPartIndex++ + sig := antigravityNativePartThoughtSignature(part) + if fc := part.Get("functionCall"); fc.Exists() { + keys := antigravityReplayToolCallKeysFromPart(fc) + for _, k := range keys { + if a.seenFC[k] { + return true + } + } + for _, k := range keys { + a.seenFC[k] = true + } + item := buildAntigravityFunctionCallPartItem(a.contentIndex, pi, fc, sig) + if len(item) > 0 { + a.items = append(a.items, item) + } + return true + } + if sig != "" { + item := buildAntigravityThoughtSignatureItem(a.contentIndex, pi, sig) + a.items = append(a.items, item) + } + return true + }) +} + +func buildAntigravityThoughtSignatureItem(contentIndex, partIndex int, signature string) []byte { + return []byte(fmt.Sprintf(`{"type":"thought_signature","thoughtSignature":%q,"contentIndex":%d,"partIndex":%d}`, + signature, contentIndex, partIndex)) +} + +func buildAntigravityFunctionCallPartItem(contentIndex, partIndex int, fc gjson.Result, signature string) []byte { + item := map[string]any{ + "type": "function_call_part", + "contentIndex": contentIndex, + "partIndex": partIndex, + "name": fc.Get("name").String(), + } + if id := strings.TrimSpace(fc.Get("id").String()); id != "" { + item["call_id"] = id + } + if args := fc.Get("args"); args.Exists() { + if args.Type == gjson.String { + item["args"] = args.String() + } else { + item["args"] = json.RawMessage(args.Raw) + } + } + if signature != "" { + item["thoughtSignature"] = signature + } + raw, err := json.Marshal(item) + if err != nil { + return nil + } + return raw +} + +func (a *antigravityReasoningReplayAccumulator) Flush(ctx context.Context) { + if a == nil || !a.scope.valid() || len(a.items) == 0 { + return + } + if !internalcache.CacheAntigravityReasoningReplayItemsBestEffort(ctx, a.scope.modelName, a.scope.sessionKey, a.items) { + _ = internalcache.DeleteAntigravityReasoningReplayItemRequired(ctx, a.scope.modelName, a.scope.sessionKey) + } +} + +func cacheAntigravityReasoningReplayFromResponse(ctx context.Context, scope antigravityReasoningReplayScope, requestPayload, body []byte) { + if !scope.valid() || len(body) == 0 { + return + } + acc := newAntigravityReasoningReplayAccumulator(scope, requestPayload) + acc.observeResponsePayload(body) + acc.Flush(ctx) +} + +func applyAntigravityNativeSignatureReplayIfNeeded(modelName string, payload []byte) []byte { + if antigravityUsesReasoningReplayCache(modelName) { + return payload + } + // Native per-part signature replay is not on upstream/dev; Gemini uses HOME replay only. + return payload +} + +func antigravityUsesReasoningReplayCache(modelName string) bool { + modelName = strings.ToLower(modelName) + if strings.Contains(modelName, "claude") { + return false + } + return strings.Contains(modelName, "gemini") || strings.Contains(modelName, "flash") || strings.Contains(modelName, "agent") +} + +func antigravityNativePartThoughtSignature(part gjson.Result) string { + for _, path := range []string{"thoughtSignature", "thought_signature", "extra_content.google.thought_signature"} { + if signature := strings.TrimSpace(part.Get(path).String()); signature != "" { + return signature + } + } + return "" +} diff --git a/internal/runtime/executor/antigravity_reasoning_replay_test.go b/internal/runtime/executor/antigravity_reasoning_replay_test.go new file mode 100644 index 000000000..17d3f892a --- /dev/null +++ b/internal/runtime/executor/antigravity_reasoning_replay_test.go @@ -0,0 +1,72 @@ +package executor + +import ( + "context" + "testing" + + internalcache "github.com/router-for-me/CLIProxyAPI/v7/internal/cache" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/executor" + "github.com/tidwall/gjson" +) + +func TestAntigravityReasoningReplayAccumulatorMultiToolSSEChunks(t *testing.T) { + internalcache.ClearAntigravityReasoningReplayCache() + t.Cleanup(internalcache.ClearAntigravityReasoningReplayCache) + + requestPayload := []byte(`{"sessionId":"sess-1","request":{"contents":[{"role":"user","parts":[{"text":"hi"}]}]}}`) + scope := antigravityReasoningReplayScope{modelName: "gemini-3-flash-agent", sessionKey: "session:sess-1"} + acc := newAntigravityReasoningReplayAccumulator(scope, requestPayload) + if acc == nil { + t.Fatal("accumulator is nil") + } + if acc.contentIndex != 1 || acc.nextPartIndex != 0 { + t.Fatalf("pending model slot = %d/%d, want 1/0", acc.contentIndex, acc.nextPartIndex) + } + + line1 := []byte(`data: {"response":{"candidates":[{"content":{"parts":[{"thoughtSignature":"sig-first","functionCall":{"name":"Read","args":{"file_path":"/a"},"id":"id1"}}]}}]}}`) + line2 := []byte(`data: {"response":{"candidates":[{"content":{"parts":[{"functionCall":{"name":"Read","args":{"file_path":"/b"},"id":"id2"}}]}}]}}`) + acc.ObserveSSELine(line1) + acc.ObserveSSELine(line2) + acc.Flush(context.Background()) + + items, ok := internalcache.GetAntigravityReasoningReplayItems("gemini-3-flash-agent", "session:sess-1") + if !ok || len(items) != 2 { + t.Fatalf("cached items = %v ok=%v, want 2 items", len(items), ok) + } + pi0 := int(gjson.GetBytes(items[0], "partIndex").Int()) + pi1 := int(gjson.GetBytes(items[1], "partIndex").Int()) + if pi0 != 0 || pi1 != 1 { + t.Fatalf("partIndex = %d,%d, want 0,1", pi0, pi1) + } + if got := gjson.GetBytes(items[0], "thoughtSignature").String(); got != "sig-first" { + t.Fatalf("first sig = %q", got) + } +} + +func TestPrepareAntigravityGeminiReasoningReplayPayloadInjectsCachedToolPart(t *testing.T) { + internalcache.ClearAntigravityReasoningReplayCache() + t.Cleanup(internalcache.ClearAntigravityReasoningReplayCache) + + item := []byte(`{"type":"function_call_part","contentIndex":1,"partIndex":0,"name":"Read","call_id":"id1","args":{"file_path":"/a"},"thoughtSignature":"sig-first"}`) + if !internalcache.CacheAntigravityReasoningReplayItems("gemini-3-flash-agent", "session:sess-2", [][]byte{item}) { + t.Fatal("cache write failed") + } + + req := cliproxyexecutor.Request{} + opts := cliproxyexecutor.Options{} + payload := []byte(`{"sessionId":"sess-2","request":{"contents":[{"role":"user","parts":[{"text":"hi"}]},{"role":"user","parts":[{"functionResponse":{"id":"id1","name":"Read","response":{"result":"ok"}}}]}]}}`) + out, scope, err := prepareAntigravityGeminiReasoningReplayPayload(context.Background(), "gemini-3-flash-agent", req, opts, payload) + if err != nil { + t.Fatalf("prepare error: %v", err) + } + if !scope.valid() { + t.Fatal("scope invalid") + } + path := "request.contents.1.parts.0.thoughtSignature" + if got := gjson.GetBytes(out, path).String(); got != "sig-first" { + t.Fatalf("%s = %q, want sig-first; body=%s", path, got, string(out)) + } + if !gjson.GetBytes(out, "request.contents.1.parts.0.functionCall").Exists() { + t.Fatalf("functionCall not injected: %s", string(out)) + } +}