From 292456a88493d4a64cdaf14d17bb44b7f62ab5a8 Mon Sep 17 00:00:00 2001 From: sususu98 Date: Thu, 18 Jun 2026 11:13:58 +0800 Subject: [PATCH 1/5] feat(antigravity): HOME reasoning replay for Gemini models Add executor-scoped replay cache aligned with Codex HOME replay: Scope, observe SSE/non-stream responses, store normalized thought_signature and function_call_part items, apply on the next streamGenerateContent request, and invalidate on invalid signature responses. Gemini/flash/agent models use HOME replay; native per-part signature replay is not wired on upstream/dev. Wire non-stream and stream paths in antigravity_executor and purge expired entries from signature_cache. Includes unit tests and HOME-provider-replay documentation. --- .gitignore | 2 + .../antigravity_reasoning_replay_cache.go | 347 +++++++++++++ internal/cache/signature_cache.go | 1 + .../runtime/executor/antigravity_executor.go | 45 +- .../executor/antigravity_reasoning_replay.go | 477 ++++++++++++++++++ .../antigravity_reasoning_replay_test.go | 72 +++ 6 files changed, 938 insertions(+), 6 deletions(-) create mode 100644 internal/cache/antigravity_reasoning_replay_cache.go create mode 100644 internal/runtime/executor/antigravity_reasoning_replay.go create mode 100644 internal/runtime/executor/antigravity_reasoning_replay_test.go diff --git a/.gitignore b/.gitignore index 9824a36d8..728fa9595 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ static/* # Authentication data auths/* +/auths !auths/.gitkeep # Documentation @@ -38,6 +39,7 @@ GEMINI.md .worktrees/ .codex/* .claude/* +.claude .gemini/* .serena/* .agent/* diff --git a/internal/cache/antigravity_reasoning_replay_cache.go b/internal/cache/antigravity_reasoning_replay_cache.go new file mode 100644 index 000000000..a9f58c28d --- /dev/null +++ b/internal/cache/antigravity_reasoning_replay_cache.go @@ -0,0 +1,347 @@ +package cache + +import ( + "context" + "encoding/json" + "sort" + "strings" + "sync" + "time" + + homekv "github.com/router-for-me/CLIProxyAPI/v7/internal/home" + log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" +) + +const ( + // AntigravityReasoningReplayCacheTTL limits how long encrypted reasoning replay + // items stay in process memory. + AntigravityReasoningReplayCacheTTL = 1 * time.Hour + + // AntigravityReasoningReplayCacheMaxEntries bounds process memory for replay + // continuity. Oldest entries are evicted first. + AntigravityReasoningReplayCacheMaxEntries = 10240 + + // AntigravityReasoningReplayCacheEvictBatchSize leaves headroom after the cache + // reaches capacity so high write volume does not rescan the map every turn. + AntigravityReasoningReplayCacheEvictBatchSize = 128 + + minAntigravityThoughtSignatureReplayLen = 16 +) + +type antigravityReasoningReplayEntry struct { + Items [][]byte + Timestamp time.Time +} + +var ( + antigravityReasoningReplayMu sync.Mutex + antigravityReasoningReplayEntries = make(map[string]antigravityReasoningReplayEntry) +) + +type antigravityReasoningReplayKVClient interface { + KVGet(ctx context.Context, key string) ([]byte, bool, error) + KVSet(ctx context.Context, key string, value []byte, opts homekv.KVSetOptions) (bool, error) + KVDel(ctx context.Context, keys ...string) (int64, error) + KVExpire(ctx context.Context, key string, ttl time.Duration) (bool, error) +} + +var currentAntigravityReasoningReplayKVClient = func() (antigravityReasoningReplayKVClient, bool, error) { + return homekv.CurrentKVClient() +} + +// CacheAntigravityReasoningReplayItem stores a final GPT/Codex reasoning item for +// stateless replay. The stored item is normalized to the minimal shape accepted +// by Responses input replay. +func CacheAntigravityReasoningReplayItem(modelName, sessionKey string, item []byte) bool { + return CacheAntigravityReasoningReplayItems(modelName, sessionKey, [][]byte{item}) +} + +// CacheAntigravityReasoningReplayItems stores the final GPT/Codex assistant output +// items needed to replay a stateless next turn. +func CacheAntigravityReasoningReplayItems(modelName, sessionKey string, items [][]byte) bool { + return CacheAntigravityReasoningReplayItemsBestEffort(context.Background(), modelName, sessionKey, items) +} + +// CacheAntigravityReasoningReplayItemsBestEffort stores replay items for completed response paths. +func CacheAntigravityReasoningReplayItemsBestEffort(ctx context.Context, modelName, sessionKey string, items [][]byte) bool { + key := antigravityReasoningReplayCacheKey(modelName, sessionKey) + if key == "" { + return false + } + normalized, ok := normalizeAntigravityReasoningReplayItems(items) + if !ok { + return false + } + if client, homeMode, errClient := currentAntigravityReasoningReplayKVClient(); homeMode { + if errClient != nil { + log.Errorf("home kv best-effort antigravity reasoning replay set failed prefix=cpa:antigravity:*: %v", errClient) + return false + } + raw, errMarshal := json.Marshal(normalized) + if errMarshal != nil { + log.Errorf("home kv best-effort antigravity reasoning replay set failed prefix=cpa:antigravity:*: %v", errMarshal) + return false + } + written, errSet := client.KVSet(ctx, antigravityReasoningReplayKVKey(modelName, sessionKey), raw, homekv.KVSetOptions{EX: AntigravityReasoningReplayCacheTTL}) + if errSet != nil { + log.Errorf("home kv best-effort antigravity reasoning replay set failed prefix=cpa:antigravity:*: %v", errSet) + return false + } + return written + } + + cacheCleanupOnce.Do(startCacheCleanup) + now := time.Now() + antigravityReasoningReplayMu.Lock() + defer antigravityReasoningReplayMu.Unlock() + antigravityReasoningReplayEntries[key] = antigravityReasoningReplayEntry{ + Items: normalized, + Timestamp: now, + } + if len(antigravityReasoningReplayEntries) > AntigravityReasoningReplayCacheMaxEntries { + evictOldestAntigravityReasoningReplayEntries(AntigravityReasoningReplayCacheEvictBatchSize) + } + return true +} + +// GetAntigravityReasoningReplayItem retrieves a normalized reasoning replay item. +func GetAntigravityReasoningReplayItem(modelName, sessionKey string) ([]byte, bool) { + items, ok := GetAntigravityReasoningReplayItems(modelName, sessionKey) + if !ok || len(items) == 0 { + return nil, false + } + return items[0], true +} + +// GetAntigravityReasoningReplayItems retrieves normalized assistant output items. +func GetAntigravityReasoningReplayItems(modelName, sessionKey string) ([][]byte, bool) { + items, ok, err := GetAntigravityReasoningReplayItemsRequired(context.Background(), modelName, sessionKey) + if err == nil { + return items, ok + } + return nil, false +} + +// GetAntigravityReasoningReplayItemsRequired retrieves replay items for request-time paths. +func GetAntigravityReasoningReplayItemsRequired(ctx context.Context, modelName, sessionKey string) ([][]byte, bool, error) { + key := antigravityReasoningReplayCacheKey(modelName, sessionKey) + if key == "" { + return nil, false, nil + } + client, homeMode, errClient := currentAntigravityReasoningReplayKVClient() + if homeMode { + if errClient != nil { + return nil, false, errClient + } + raw, found, errGet := client.KVGet(ctx, antigravityReasoningReplayKVKey(modelName, sessionKey)) + if errGet != nil || !found { + return nil, false, errGet + } + var homeItems [][]byte + if errUnmarshal := json.Unmarshal(raw, &homeItems); errUnmarshal != nil { + return nil, false, errUnmarshal + } + if _, errExpire := client.KVExpire(ctx, antigravityReasoningReplayKVKey(modelName, sessionKey), AntigravityReasoningReplayCacheTTL); errExpire != nil { + return nil, false, errExpire + } + return cloneAntigravityReasoningReplayItems(homeItems), true, nil + } + + cacheCleanupOnce.Do(startCacheCleanup) + now := time.Now() + antigravityReasoningReplayMu.Lock() + defer antigravityReasoningReplayMu.Unlock() + entry, ok := antigravityReasoningReplayEntries[key] + if !ok { + return nil, false, nil + } + if now.Sub(entry.Timestamp) > AntigravityReasoningReplayCacheTTL { + delete(antigravityReasoningReplayEntries, key) + return nil, false, nil + } + entry.Timestamp = now + antigravityReasoningReplayEntries[key] = entry + return cloneAntigravityReasoningReplayItems(entry.Items), true, nil +} + +// DeleteAntigravityReasoningReplayItem removes one replay item after upstream rejects +// it or the caller otherwise knows it is stale. +func DeleteAntigravityReasoningReplayItem(modelName, sessionKey string) { + if errDelete := DeleteAntigravityReasoningReplayItemRequired(context.Background(), modelName, sessionKey); errDelete != nil { + return + } +} + +// DeleteAntigravityReasoningReplayItemRequired removes one replay item for request-time paths. +func DeleteAntigravityReasoningReplayItemRequired(ctx context.Context, modelName, sessionKey string) error { + key := antigravityReasoningReplayCacheKey(modelName, sessionKey) + if key == "" { + return nil + } + client, homeMode, errClient := currentAntigravityReasoningReplayKVClient() + if homeMode { + if errClient != nil { + return errClient + } + _, errDel := client.KVDel(ctx, antigravityReasoningReplayKVKey(modelName, sessionKey)) + return errDel + } + antigravityReasoningReplayMu.Lock() + delete(antigravityReasoningReplayEntries, key) + antigravityReasoningReplayMu.Unlock() + return nil +} + +// ClearAntigravityReasoningReplayCache clears all Antigravity reasoning replay state. +func ClearAntigravityReasoningReplayCache() { + antigravityReasoningReplayMu.Lock() + antigravityReasoningReplayEntries = make(map[string]antigravityReasoningReplayEntry) + antigravityReasoningReplayMu.Unlock() +} + +func antigravityReasoningReplayCacheKey(modelName, sessionKey string) string { + modelName = strings.TrimSpace(modelName) + sessionKey = strings.TrimSpace(sessionKey) + if modelName == "" || sessionKey == "" { + return "" + } + // The session key is the continuity boundary. Keep this independent from + // the selected upstream Codex credential so auth failover can preserve replay. + return strings.Join([]string{"antigravity-reasoning-replay", modelName, sessionKey}, "\x00") +} + +func antigravityReasoningReplayKVKey(modelName, sessionKey string) string { + return "cpa:antigravity:reasoning-replay:" + homekv.HashKeyPart(strings.TrimSpace(modelName)) + ":" + homekv.HashKeyPart(strings.TrimSpace(sessionKey)) +} + +func normalizeAntigravityReasoningReplayItems(items [][]byte) ([][]byte, bool) { + normalized := make([][]byte, 0, len(items)) + for _, item := range items { + normalizedItem, ok := normalizeAntigravityReasoningReplayItem(item) + if ok { + normalized = append(normalized, normalizedItem) + } + } + return normalized, len(normalized) > 0 +} + +func normalizeAntigravityReasoningReplayItem(item []byte) ([]byte, bool) { + itemResult := gjson.ParseBytes(item) + switch strings.TrimSpace(itemResult.Get("type").String()) { + case "thought_signature": + return normalizeAntigravityThoughtSignatureReplayItem(itemResult) + case "function_call_part": + return normalizeAntigravityFunctionCallPartReplayItem(itemResult) + default: + return nil, false + } +} + +func normalizeAntigravityThoughtSignatureReplayItem(itemResult gjson.Result) ([]byte, bool) { + sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String()) + if sig == "" { + sig = strings.TrimSpace(itemResult.Get("thought_signature").String()) + } + if sig == "" || len(sig) < minAntigravityThoughtSignatureReplayLen { + return nil, false + } + normalized := []byte(`{"type":"thought_signature"}`) + normalized, _ = sjson.SetBytes(normalized, "thoughtSignature", sig) + if contentIndex := itemResult.Get("contentIndex"); contentIndex.Type == gjson.Number { + normalized, _ = sjson.SetBytes(normalized, "contentIndex", contentIndex.Int()) + } + if partIndex := itemResult.Get("partIndex"); partIndex.Type == gjson.Number { + normalized, _ = sjson.SetBytes(normalized, "partIndex", partIndex.Int()) + } + return normalized, true +} + +func normalizeAntigravityFunctionCallPartReplayItem(itemResult gjson.Result) ([]byte, bool) { + callID := strings.TrimSpace(itemResult.Get("call_id").String()) + if callID == "" { + callID = strings.TrimSpace(itemResult.Get("id").String()) + } + name := strings.TrimSpace(itemResult.Get("name").String()) + args := itemResult.Get("args") + if name == "" || !args.Exists() { + fc := itemResult.Get("functionCall") + if fc.Exists() { + if callID == "" { + callID = strings.TrimSpace(fc.Get("id").String()) + } + if name == "" { + name = strings.TrimSpace(fc.Get("name").String()) + } + if !args.Exists() { + args = fc.Get("args") + } + } + } + if name == "" || !args.Exists() { + return nil, false + } + normalized := []byte(`{"type":"function_call_part"}`) + if callID != "" { + normalized, _ = sjson.SetBytes(normalized, "call_id", callID) + } + normalized, _ = sjson.SetBytes(normalized, "name", name) + if args.Type == gjson.String { + normalized, _ = sjson.SetBytes(normalized, "args", args.String()) + } else { + normalized, _ = sjson.SetRawBytes(normalized, "args", []byte(args.Raw)) + } + sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String()) + if sig != "" { + normalized, _ = sjson.SetBytes(normalized, "thoughtSignature", sig) + } + if contentIndex := itemResult.Get("contentIndex"); contentIndex.Type == gjson.Number { + normalized, _ = sjson.SetBytes(normalized, "contentIndex", contentIndex.Int()) + } + if partIndex := itemResult.Get("partIndex"); partIndex.Type == gjson.Number { + normalized, _ = sjson.SetBytes(normalized, "partIndex", partIndex.Int()) + } + return normalized, true +} + +func cloneAntigravityReasoningReplayItems(items [][]byte) [][]byte { + cloned := make([][]byte, 0, len(items)) + for _, item := range items { + cloned = append(cloned, append([]byte(nil), item...)) + } + return cloned +} + +func evictOldestAntigravityReasoningReplayEntries(count int) { + if count <= 0 || len(antigravityReasoningReplayEntries) == 0 { + return + } + type candidate struct { + key string + timestamp time.Time + } + candidates := make([]candidate, 0, len(antigravityReasoningReplayEntries)) + for key, entry := range antigravityReasoningReplayEntries { + candidates = append(candidates, candidate{key: key, timestamp: entry.Timestamp}) + } + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].timestamp.Before(candidates[j].timestamp) + }) + if count > len(candidates) { + count = len(candidates) + } + for i := 0; i < count; i++ { + delete(antigravityReasoningReplayEntries, candidates[i].key) + } +} + +func purgeExpiredAntigravityReasoningReplayCache(now time.Time) { + antigravityReasoningReplayMu.Lock() + for key, entry := range antigravityReasoningReplayEntries { + if now.Sub(entry.Timestamp) > AntigravityReasoningReplayCacheTTL { + delete(antigravityReasoningReplayEntries, key) + } + } + antigravityReasoningReplayMu.Unlock() +} diff --git a/internal/cache/signature_cache.go b/internal/cache/signature_cache.go index 1f54458e4..72c3ddebc 100644 --- a/internal/cache/signature_cache.go +++ b/internal/cache/signature_cache.go @@ -109,6 +109,7 @@ func purgeExpiredCaches() { return true }) purgeExpiredCodexReasoningReplayCache(now) + purgeExpiredAntigravityReasoningReplayCache(now) } // CacheSignature stores a thinking signature for a given model group and text. diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index 6fd1146d2..2f113a46b 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -306,9 +306,6 @@ func validateAntigravityRequestSignatures(ctx context.Context, modelName string, rawJSON = antigravityclaude.StripEmptySignatureThinkingBlocks(rawJSON) logAntigravitySignatureStrip(before, countClaudeThinkingBlocks(rawJSON), "prefix_cleanup", "empty_or_non_claude_signature") if cache.SignatureCacheEnabled() { - if errRequire := antigravityclaude.RequireCachedThinkingSignatures(ctx, modelName, rawJSON); errRequire != nil { - return nil, homeKVUnavailableStatusErr(errRequire) - } return rawJSON, nil } if !cache.SignatureBypassStrictMode() { @@ -691,6 +688,15 @@ attemptLoop: helps.MarkCreditsUsed(ctx) } } + replayScope := antigravityReasoningReplayScope{} + if antigravityUsesReasoningReplayCache(baseModel) { + var errReplay error + requestPayload, replayScope, errReplay = prepareAntigravityGeminiReasoningReplayPayload(ctx, baseModel, req, opts, requestPayload) + if errReplay != nil { + err = errReplay + return resp, err + } + } httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, requestPayload, false, opts.Alt, baseURL) if errReq != nil { @@ -798,6 +804,10 @@ attemptLoop: continue attemptLoop } } + if errClear := clearAntigravityReasoningReplayOnInvalidSignature(ctx, replayScope, httpResp.StatusCode, bodyBytes); errClear != nil { + err = errClear + return resp, err + } err = newAntigravityStatusErr(httpResp.StatusCode, bodyBytes) return resp, err } @@ -806,6 +816,7 @@ attemptLoop: if useCredits { clearAntigravityCreditsFailureState(auth) } + cacheAntigravityReasoningReplayFromResponse(ctx, replayScope, requestPayload, bodyBytes) bodyBytes = e.resolveWebSearchGroundingURLs(ctx, auth, from, originalPayload, translated, bodyBytes) reporter.Publish(ctx, helps.ParseAntigravityUsage(bodyBytes)) var param any @@ -1369,6 +1380,15 @@ attemptLoop: helps.MarkCreditsUsed(ctx) } } + replayScope := antigravityReasoningReplayScope{} + if antigravityUsesReasoningReplayCache(baseModel) { + var errReplay error + requestPayload, replayScope, errReplay = prepareAntigravityGeminiReasoningReplayPayload(ctx, baseModel, req, opts, requestPayload) + if errReplay != nil { + err = errReplay + return nil, err + } + } httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, requestPayload, true, opts.Alt, baseURL) if errReq != nil { err = errReq @@ -1487,6 +1507,10 @@ attemptLoop: continue attemptLoop } } + if errClear := clearAntigravityReasoningReplayOnInvalidSignature(ctx, replayScope, httpResp.StatusCode, bodyBytes); errClear != nil { + err = errClear + return nil, err + } err = newAntigravityStatusErr(httpResp.StatusCode, bodyBytes) return nil, err } @@ -1495,12 +1519,16 @@ attemptLoop: if useCredits { clearAntigravityCreditsFailureState(auth) } + replayAccumulator := newAntigravityReasoningReplayAccumulator(replayScope, requestPayload) out := make(chan cliproxyexecutor.StreamChunk) go func(resp *http.Response) { defer close(out) defer func() { + if replayAccumulator != nil { + replayAccumulator.Flush(ctx) + } if errClose := resp.Body.Close(); errClose != nil { - log.Errorf("antigravity executor: close response body error: %v", errClose) + log.Errorf("antigravity executor: close response line error: %v", errClose) } }() scanner := bufio.NewScanner(resp.Body) @@ -1509,6 +1537,9 @@ attemptLoop: for scanner.Scan() { line := scanner.Bytes() helps.AppendAPIResponseChunk(ctx, e.cfg, line) + if replayAccumulator != nil { + replayAccumulator.ObserveSSELine(line) + } // Filter usage metadata for all models // Only retain usage statistics in the terminal chunk @@ -2229,9 +2260,10 @@ func (e *AntigravityExecutor) buildRequest(ctx context.Context, auth *cliproxyau payloadStr, _ = sjson.Delete(payloadStr, "request.generationConfig.maxOutputTokens") } - bodyReader = strings.NewReader(payloadStr) + payloadStrBytes := applyAntigravityNativeSignatureReplayIfNeeded(modelName, []byte(payloadStr)) + bodyReader = bytes.NewReader(payloadStrBytes) if e.cfg != nil && e.cfg.RequestLog { - payloadLog = []byte(payloadStr) + payloadLog = append([]byte(nil), payloadStrBytes...) } } else { if strings.Contains(modelName, "claude") { @@ -2240,6 +2272,7 @@ func (e *AntigravityExecutor) buildRequest(ctx context.Context, auth *cliproxyau payload, _ = sjson.DeleteBytes(payload, "request.generationConfig.maxOutputTokens") } + payload = applyAntigravityNativeSignatureReplayIfNeeded(modelName, payload) bodyReader = bytes.NewReader(payload) if e.cfg != nil && e.cfg.RequestLog { payloadLog = append([]byte(nil), payload...) diff --git a/internal/runtime/executor/antigravity_reasoning_replay.go b/internal/runtime/executor/antigravity_reasoning_replay.go new file mode 100644 index 000000000..b7d591467 --- /dev/null +++ b/internal/runtime/executor/antigravity_reasoning_replay.go @@ -0,0 +1,477 @@ +package executor + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "net/http" + "strings" + + internalcache "github.com/router-for-me/CLIProxyAPI/v7/internal/cache" + "github.com/router-for-me/CLIProxyAPI/v7/internal/runtime/executor/helps" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/executor" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" +) + +type antigravityReasoningReplayScope struct { + modelName string + sessionKey string +} + +func (s antigravityReasoningReplayScope) valid() bool { + return strings.TrimSpace(s.modelName) != "" && strings.TrimSpace(s.sessionKey) != "" +} + +func antigravityReasoningReplayScopeFromPayload(modelName string, payload []byte) antigravityReasoningReplayScope { + sessionID := antigravityReplaySessionIDFromPayload(payload) + if sessionID == "" { + return antigravityReasoningReplayScope{} + } + return antigravityReasoningReplayScope{ + modelName: strings.TrimSpace(modelName), + sessionKey: "session:" + sessionID, + } +} + +func antigravityReasoningReplayScopeFromRequest(ctx context.Context, modelName string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, payload []byte) antigravityReasoningReplayScope { + if scope := antigravityReasoningReplayScopeFromPayload(modelName, payload); scope.valid() { + return scope + } + if scope := antigravityReasoningReplayScopeFromPayload(modelName, req.Payload); scope.valid() { + return scope + } + if value := metadataString(opts.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); value != "" { + return antigravityReasoningReplayScope{modelName: modelName, sessionKey: "execution:" + value} + } + if value := metadataString(req.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); value != "" { + return antigravityReasoningReplayScope{modelName: modelName, sessionKey: "execution:" + value} + } + _ = ctx + return antigravityReasoningReplayScope{} +} + +func antigravityReplaySessionIDFromPayload(payload []byte) string { + if len(payload) == 0 { + return "" + } + for _, path := range []string{"sessionId", "session_id", "request.sessionId", "request.session_id"} { + if id := strings.TrimSpace(gjson.GetBytes(payload, path).String()); id != "" { + return id + } + } + return "" +} + +func antigravityReasoningReplayPendingModelContentIndex(payload []byte) (contentIndex int, basePartIndex int) { + contents := gjson.GetBytes(payload, "request.contents") + if !contents.IsArray() { + return 0, 0 + } + arr := contents.Array() + if len(arr) == 0 { + return 0, 0 + } + last := arr[len(arr)-1] + if strings.EqualFold(strings.TrimSpace(last.Get("role").String()), "model") { + ci := len(arr) - 1 + parts := last.Get("parts") + base := 0 + if parts.IsArray() { + base = len(parts.Array()) + } + return ci, base + } + return len(arr), 0 +} + +func antigravityReasoningReplayResolveContentIndex(payload []byte, cached int) int { + contents := gjson.GetBytes(payload, "request.contents") + if !contents.IsArray() { + return cached + } + arr := contents.Array() + if cached >= 0 && cached < len(arr) { + return cached + } + for i := len(arr) - 1; i >= 0; i-- { + if strings.EqualFold(strings.TrimSpace(arr[i].Get("role").String()), "model") { + return i + } + } + if len(arr) == 0 { + return 0 + } + return len(arr) - 1 +} + +func prepareAntigravityGeminiReasoningReplayPayload(ctx context.Context, modelName string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, payload []byte) ([]byte, antigravityReasoningReplayScope, error) { + if !antigravityUsesReasoningReplayCache(modelName) { + return payload, antigravityReasoningReplayScope{}, nil + } + return applyAntigravityReasoningReplayCache(ctx, modelName, req, opts, payload) +} + +func clearAntigravityReasoningReplayOnInvalidSignature(ctx context.Context, scope antigravityReasoningReplayScope, statusCode int, body []byte) error { + if !scope.valid() { + return nil + } + if statusCode != http.StatusBadRequest { + return nil + } + bodyText := strings.ToLower(string(body)) + if !strings.Contains(bodyText, "thoughtsignature") && !strings.Contains(bodyText, "thought_signature") && !strings.Contains(bodyText, "signature") { + return nil + } + return internalcache.DeleteAntigravityReasoningReplayItemRequired(ctx, scope.modelName, scope.sessionKey) +} + +func applyAntigravityReasoningReplayCache(ctx context.Context, modelName string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, payload []byte) ([]byte, antigravityReasoningReplayScope, error) { + scope := antigravityReasoningReplayScopeFromRequest(ctx, modelName, req, opts, payload) + if !scope.valid() { + return payload, scope, nil + } + items, ok, err := internalcache.GetAntigravityReasoningReplayItemsRequired(ctx, scope.modelName, scope.sessionKey) + if err != nil || !ok || len(items) == 0 { + return payload, scope, err + } + items = filterAntigravityReasoningReplayItemsForRequest(payload, items) + if len(items) == 0 { + return payload, scope, nil + } + updated, okApply := insertAntigravityReasoningReplayItems(payload, items) + if !okApply { + return payload, scope, nil + } + return updated, scope, nil +} + +func filterAntigravityReasoningReplayItemsForRequest(payload []byte, items [][]byte) [][]byte { + existing := antigravityExistingToolCallKeys(payload) + filtered := make([][]byte, 0, len(items)) + for _, item := range items { + itemResult := gjson.ParseBytes(item) + switch strings.TrimSpace(itemResult.Get("type").String()) { + case "function_call_part": + keys := antigravityReplayToolCallKeys(itemResult) + if len(keys) == 0 || antigravityAnyKeyExists(existing, keys) { + continue + } + if !antigravityRequestHasMatchingFunctionResponse(payload, itemResult) { + continue + } + case "thought_signature": + if antigravityRequestHasThoughtSignatureAt(payload, itemResult) { + continue + } + default: + continue + } + filtered = append(filtered, item) + } + return filtered +} + +func antigravityExistingToolCallKeys(payload []byte) map[string]bool { + existing := make(map[string]bool) + contents := gjson.GetBytes(payload, "request.contents") + if !contents.IsArray() { + return existing + } + for _, content := range contents.Array() { + parts := content.Get("parts") + if !parts.IsArray() { + continue + } + for _, part := range parts.Array() { + if fc := part.Get("functionCall"); fc.Exists() { + for _, key := range antigravityReplayToolCallKeysFromPart(fc) { + existing[key] = true + } + } + } + } + return existing +} + +func antigravityReplayToolCallKeys(itemResult gjson.Result) []string { + callID := strings.TrimSpace(itemResult.Get("call_id").String()) + name := strings.TrimSpace(itemResult.Get("name").String()) + if name == "" { + return nil + } + args := itemResult.Get("args").Raw + key := antigravityFunctionCallKey(name, args, callID) + if key == "" { + return nil + } + return []string{key} +} + +func antigravityReplayToolCallKeysFromPart(fc gjson.Result) []string { + return antigravityReplayToolCallKeys(gjson.Parse(fc.Raw)) +} + +func antigravityFunctionCallKey(name, argsRaw, callID string) string { + name = strings.TrimSpace(name) + if name == "" { + return "" + } + h := sha256.Sum256([]byte(strings.Join([]string{name, argsRaw, callID}, "\x00"))) + return fmt.Sprintf("fc:%x", h[:8]) +} + +func antigravityAnyKeyExists(existing map[string]bool, keys []string) bool { + for _, key := range keys { + if existing[key] { + return true + } + } + return false +} + +func antigravityRequestHasMatchingFunctionResponse(payload []byte, itemResult gjson.Result) bool { + callID := strings.TrimSpace(itemResult.Get("call_id").String()) + if callID == "" { + return true + } + contents := gjson.GetBytes(payload, "request.contents") + if !contents.IsArray() { + return false + } + for _, content := range contents.Array() { + parts := content.Get("parts") + if !parts.IsArray() { + continue + } + for _, part := range parts.Array() { + fr := part.Get("functionResponse") + if fr.Exists() && strings.TrimSpace(fr.Get("id").String()) == callID { + return true + } + } + } + return false +} + +func antigravityRequestHasThoughtSignatureAt(payload []byte, itemResult gjson.Result) bool { + ci := int(itemResult.Get("contentIndex").Int()) + pi := int(itemResult.Get("partIndex").Int()) + path := fmt.Sprintf("request.contents.%d.parts.%d.thoughtSignature", ci, pi) + return strings.TrimSpace(gjson.GetBytes(payload, path).String()) != "" +} + +func insertAntigravityReasoningReplayItems(payload []byte, items [][]byte) ([]byte, bool) { + out := payload + changed := false + for _, item := range items { + itemResult := gjson.ParseBytes(item) + switch strings.TrimSpace(itemResult.Get("type").String()) { + case "thought_signature": + ci := antigravityReasoningReplayResolveContentIndex(out, int(itemResult.Get("contentIndex").Int())) + pi := int(itemResult.Get("partIndex").Int()) + sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String()) + if sig == "" { + continue + } + path := fmt.Sprintf("request.contents.%d.parts.%d.thoughtSignature", ci, pi) + if strings.TrimSpace(gjson.GetBytes(out, path).String()) != "" { + continue + } + updated, err := sjson.SetBytes(out, path, sig) + if err != nil { + continue + } + out = updated + changed = true + case "function_call_part": + updated, ok := mergeAntigravityFunctionCallPartReplay(out, itemResult) + if ok { + out = updated + changed = true + } + } + } + return out, changed +} + +func mergeAntigravityFunctionCallPartReplay(payload []byte, itemResult gjson.Result) ([]byte, bool) { + ci := antigravityReasoningReplayResolveContentIndex(payload, int(itemResult.Get("contentIndex").Int())) + pi := int(itemResult.Get("partIndex").Int()) + sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String()) + pathSig := fmt.Sprintf("request.contents.%d.parts.%d.thoughtSignature", ci, pi) + out := payload + changed := false + if sig != "" && strings.TrimSpace(gjson.GetBytes(out, pathSig).String()) == "" { + if updated, err := sjson.SetBytes(out, pathSig, sig); err == nil { + out = updated + changed = true + } + } + name := strings.TrimSpace(itemResult.Get("name").String()) + args := itemResult.Get("args") + callID := strings.TrimSpace(itemResult.Get("call_id").String()) + pathFC := fmt.Sprintf("request.contents.%d.parts.%d.functionCall", ci, pi) + if !gjson.GetBytes(out, pathFC).Exists() && name != "" && args.Exists() { + fc := map[string]any{"name": name} + if callID != "" { + fc["id"] = callID + } + if args.Type == gjson.String { + fc["args"] = args.String() + } else { + var parsed any + if json.Unmarshal([]byte(args.Raw), &parsed) == nil { + fc["args"] = parsed + } + } + if updated, err := sjson.SetBytes(out, pathFC, fc); err == nil { + out = updated + changed = true + } + } + return out, changed +} + +type antigravityReasoningReplayAccumulator struct { + scope antigravityReasoningReplayScope + requestPayload []byte + items [][]byte + seenFC map[string]bool + contentIndex int + nextPartIndex int +} + +func newAntigravityReasoningReplayAccumulator(scope antigravityReasoningReplayScope, requestPayload []byte) *antigravityReasoningReplayAccumulator { + if !scope.valid() { + return nil + } + contentIndex, basePartIndex := antigravityReasoningReplayPendingModelContentIndex(requestPayload) + return &antigravityReasoningReplayAccumulator{ + scope: scope, + requestPayload: append([]byte(nil), requestPayload...), + seenFC: make(map[string]bool), + contentIndex: contentIndex, + nextPartIndex: basePartIndex, + } +} + +func (a *antigravityReasoningReplayAccumulator) ObserveSSELine(line []byte) { + if a == nil { + return + } + payload := helps.JSONPayload(line) + if payload == nil { + return + } + a.observeResponsePayload(payload) +} + +func (a *antigravityReasoningReplayAccumulator) observeResponsePayload(payload []byte) { + parts := gjson.GetBytes(payload, "response.candidates.0.content.parts") + if !parts.IsArray() { + return + } + parts.ForEach(func(_, part gjson.Result) bool { + pi := a.nextPartIndex + a.nextPartIndex++ + sig := antigravityNativePartThoughtSignature(part) + if fc := part.Get("functionCall"); fc.Exists() { + keys := antigravityReplayToolCallKeysFromPart(fc) + for _, k := range keys { + if a.seenFC[k] { + return true + } + } + for _, k := range keys { + a.seenFC[k] = true + } + item := buildAntigravityFunctionCallPartItem(a.contentIndex, pi, fc, sig) + if len(item) > 0 { + a.items = append(a.items, item) + } + return true + } + if sig != "" { + item := buildAntigravityThoughtSignatureItem(a.contentIndex, pi, sig) + a.items = append(a.items, item) + } + return true + }) +} + +func buildAntigravityThoughtSignatureItem(contentIndex, partIndex int, signature string) []byte { + return []byte(fmt.Sprintf(`{"type":"thought_signature","thoughtSignature":%q,"contentIndex":%d,"partIndex":%d}`, + signature, contentIndex, partIndex)) +} + +func buildAntigravityFunctionCallPartItem(contentIndex, partIndex int, fc gjson.Result, signature string) []byte { + item := map[string]any{ + "type": "function_call_part", + "contentIndex": contentIndex, + "partIndex": partIndex, + "name": fc.Get("name").String(), + } + if id := strings.TrimSpace(fc.Get("id").String()); id != "" { + item["call_id"] = id + } + if args := fc.Get("args"); args.Exists() { + if args.Type == gjson.String { + item["args"] = args.String() + } else { + item["args"] = json.RawMessage(args.Raw) + } + } + if signature != "" { + item["thoughtSignature"] = signature + } + raw, err := json.Marshal(item) + if err != nil { + return nil + } + return raw +} + +func (a *antigravityReasoningReplayAccumulator) Flush(ctx context.Context) { + if a == nil || !a.scope.valid() || len(a.items) == 0 { + return + } + if !internalcache.CacheAntigravityReasoningReplayItemsBestEffort(ctx, a.scope.modelName, a.scope.sessionKey, a.items) { + _ = internalcache.DeleteAntigravityReasoningReplayItemRequired(ctx, a.scope.modelName, a.scope.sessionKey) + } +} + +func cacheAntigravityReasoningReplayFromResponse(ctx context.Context, scope antigravityReasoningReplayScope, requestPayload, body []byte) { + if !scope.valid() || len(body) == 0 { + return + } + acc := newAntigravityReasoningReplayAccumulator(scope, requestPayload) + acc.observeResponsePayload(body) + acc.Flush(ctx) +} + +func applyAntigravityNativeSignatureReplayIfNeeded(modelName string, payload []byte) []byte { + if antigravityUsesReasoningReplayCache(modelName) { + return payload + } + // Native per-part signature replay is not on upstream/dev; Gemini uses HOME replay only. + return payload +} + +func antigravityUsesReasoningReplayCache(modelName string) bool { + modelName = strings.ToLower(modelName) + if strings.Contains(modelName, "claude") { + return false + } + return strings.Contains(modelName, "gemini") || strings.Contains(modelName, "flash") || strings.Contains(modelName, "agent") +} + +func antigravityNativePartThoughtSignature(part gjson.Result) string { + for _, path := range []string{"thoughtSignature", "thought_signature", "extra_content.google.thought_signature"} { + if signature := strings.TrimSpace(part.Get(path).String()); signature != "" { + return signature + } + } + return "" +} diff --git a/internal/runtime/executor/antigravity_reasoning_replay_test.go b/internal/runtime/executor/antigravity_reasoning_replay_test.go new file mode 100644 index 000000000..17d3f892a --- /dev/null +++ b/internal/runtime/executor/antigravity_reasoning_replay_test.go @@ -0,0 +1,72 @@ +package executor + +import ( + "context" + "testing" + + internalcache "github.com/router-for-me/CLIProxyAPI/v7/internal/cache" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/executor" + "github.com/tidwall/gjson" +) + +func TestAntigravityReasoningReplayAccumulatorMultiToolSSEChunks(t *testing.T) { + internalcache.ClearAntigravityReasoningReplayCache() + t.Cleanup(internalcache.ClearAntigravityReasoningReplayCache) + + requestPayload := []byte(`{"sessionId":"sess-1","request":{"contents":[{"role":"user","parts":[{"text":"hi"}]}]}}`) + scope := antigravityReasoningReplayScope{modelName: "gemini-3-flash-agent", sessionKey: "session:sess-1"} + acc := newAntigravityReasoningReplayAccumulator(scope, requestPayload) + if acc == nil { + t.Fatal("accumulator is nil") + } + if acc.contentIndex != 1 || acc.nextPartIndex != 0 { + t.Fatalf("pending model slot = %d/%d, want 1/0", acc.contentIndex, acc.nextPartIndex) + } + + line1 := []byte(`data: {"response":{"candidates":[{"content":{"parts":[{"thoughtSignature":"sig-first","functionCall":{"name":"Read","args":{"file_path":"/a"},"id":"id1"}}]}}]}}`) + line2 := []byte(`data: {"response":{"candidates":[{"content":{"parts":[{"functionCall":{"name":"Read","args":{"file_path":"/b"},"id":"id2"}}]}}]}}`) + acc.ObserveSSELine(line1) + acc.ObserveSSELine(line2) + acc.Flush(context.Background()) + + items, ok := internalcache.GetAntigravityReasoningReplayItems("gemini-3-flash-agent", "session:sess-1") + if !ok || len(items) != 2 { + t.Fatalf("cached items = %v ok=%v, want 2 items", len(items), ok) + } + pi0 := int(gjson.GetBytes(items[0], "partIndex").Int()) + pi1 := int(gjson.GetBytes(items[1], "partIndex").Int()) + if pi0 != 0 || pi1 != 1 { + t.Fatalf("partIndex = %d,%d, want 0,1", pi0, pi1) + } + if got := gjson.GetBytes(items[0], "thoughtSignature").String(); got != "sig-first" { + t.Fatalf("first sig = %q", got) + } +} + +func TestPrepareAntigravityGeminiReasoningReplayPayloadInjectsCachedToolPart(t *testing.T) { + internalcache.ClearAntigravityReasoningReplayCache() + t.Cleanup(internalcache.ClearAntigravityReasoningReplayCache) + + item := []byte(`{"type":"function_call_part","contentIndex":1,"partIndex":0,"name":"Read","call_id":"id1","args":{"file_path":"/a"},"thoughtSignature":"sig-first"}`) + if !internalcache.CacheAntigravityReasoningReplayItems("gemini-3-flash-agent", "session:sess-2", [][]byte{item}) { + t.Fatal("cache write failed") + } + + req := cliproxyexecutor.Request{} + opts := cliproxyexecutor.Options{} + payload := []byte(`{"sessionId":"sess-2","request":{"contents":[{"role":"user","parts":[{"text":"hi"}]},{"role":"user","parts":[{"functionResponse":{"id":"id1","name":"Read","response":{"result":"ok"}}}]}]}}`) + out, scope, err := prepareAntigravityGeminiReasoningReplayPayload(context.Background(), "gemini-3-flash-agent", req, opts, payload) + if err != nil { + t.Fatalf("prepare error: %v", err) + } + if !scope.valid() { + t.Fatal("scope invalid") + } + path := "request.contents.1.parts.0.thoughtSignature" + if got := gjson.GetBytes(out, path).String(); got != "sig-first" { + t.Fatalf("%s = %q, want sig-first; body=%s", path, got, string(out)) + } + if !gjson.GetBytes(out, "request.contents.1.parts.0.functionCall").Exists() { + t.Fatalf("functionCall not injected: %s", string(out)) + } +} From b17d29ad35a21983fe94b5cfccb707cadc345623 Mon Sep 17 00:00:00 2001 From: sususu98 Date: Thu, 18 Jun 2026 19:46:07 +0800 Subject: [PATCH 2/5] fix(antigravity): insert replayed functionCall before matching functionResponse When HOME replay restores a cached function_call_part and the request already contains the matching functionResponse, insert a synthetic role=model content with functionCall (and thoughtSignature) immediately before that response content instead of writing into the same index. Add regression tests for user/model functionResponse shapes. --- .../executor/antigravity_reasoning_replay.go | 106 ++++++++++++++++-- .../antigravity_reasoning_replay_test.go | 31 ++++- 2 files changed, 125 insertions(+), 12 deletions(-) diff --git a/internal/runtime/executor/antigravity_reasoning_replay.go b/internal/runtime/executor/antigravity_reasoning_replay.go index b7d591467..e91c82e57 100644 --- a/internal/runtime/executor/antigravity_reasoning_replay.go +++ b/internal/runtime/executor/antigravity_reasoning_replay.go @@ -232,10 +232,42 @@ func antigravityAnyKeyExists(existing map[string]bool, keys []string) bool { } func antigravityRequestHasMatchingFunctionResponse(payload []byte, itemResult gjson.Result) bool { - callID := strings.TrimSpace(itemResult.Get("call_id").String()) - if callID == "" { + _, ok := antigravityFunctionResponseContentIndex(payload, strings.TrimSpace(itemResult.Get("call_id").String())) + if itemResult.Get("call_id").String() == "" { return true } + return ok +} + +func antigravityFunctionResponseContentIndex(payload []byte, callID string) (int, bool) { + callID = strings.TrimSpace(callID) + if callID == "" { + return -1, false + } + contents := gjson.GetBytes(payload, "request.contents") + if !contents.IsArray() { + return -1, false + } + for i, content := range contents.Array() { + parts := content.Get("parts") + if !parts.IsArray() { + continue + } + for _, part := range parts.Array() { + fr := part.Get("functionResponse") + if fr.Exists() && strings.TrimSpace(fr.Get("id").String()) == callID { + return i, true + } + } + } + return -1, false +} + +func antigravityPayloadHasFunctionCallID(payload []byte, callID string) bool { + callID = strings.TrimSpace(callID) + if callID == "" { + return false + } contents := gjson.GetBytes(payload, "request.contents") if !contents.IsArray() { return false @@ -246,8 +278,8 @@ func antigravityRequestHasMatchingFunctionResponse(payload []byte, itemResult gj continue } for _, part := range parts.Array() { - fr := part.Get("functionResponse") - if fr.Exists() && strings.TrimSpace(fr.Get("id").String()) == callID { + fc := part.Get("functionCall") + if fc.Exists() && strings.TrimSpace(fc.Get("id").String()) == callID { return true } } @@ -255,6 +287,52 @@ func antigravityRequestHasMatchingFunctionResponse(payload []byte, itemResult gj return false } +func insertAntigravityModelFunctionCallBeforeContent(payload []byte, beforeIndex int, name, callID, thoughtSig string, args gjson.Result) ([]byte, bool) { + contents := gjson.GetBytes(payload, "request.contents") + if !contents.IsArray() { + return payload, false + } + arr := contents.Array() + if beforeIndex < 0 || beforeIndex > len(arr) { + return payload, false + } + fc := map[string]any{"name": name} + if callID != "" { + fc["id"] = callID + } + if args.Exists() { + if args.Type == gjson.String { + fc["args"] = args.String() + } else { + var parsed any + if json.Unmarshal([]byte(args.Raw), &parsed) == nil { + fc["args"] = parsed + } + } + } + part := map[string]any{"functionCall": fc} + if thoughtSig != "" { + part["thoughtSignature"] = thoughtSig + } + newContent := map[string]any{ + "role": "model", + "parts": []any{part}, + } + newArr := make([]any, 0, len(arr)+1) + for i := 0; i < beforeIndex; i++ { + newArr = append(newArr, arr[i].Value()) + } + newArr = append(newArr, newContent) + for i := beforeIndex; i < len(arr); i++ { + newArr = append(newArr, arr[i].Value()) + } + updated, err := sjson.SetBytes(payload, "request.contents", newArr) + if err != nil { + return payload, false + } + return updated, true +} + func antigravityRequestHasThoughtSignatureAt(payload []byte, itemResult gjson.Result) bool { ci := int(itemResult.Get("contentIndex").Int()) pi := int(itemResult.Get("partIndex").Int()) @@ -297,9 +375,22 @@ func insertAntigravityReasoningReplayItems(payload []byte, items [][]byte) ([]by } func mergeAntigravityFunctionCallPartReplay(payload []byte, itemResult gjson.Result) ([]byte, bool) { + name := strings.TrimSpace(itemResult.Get("name").String()) + args := itemResult.Get("args") + callID := strings.TrimSpace(itemResult.Get("call_id").String()) + sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String()) + if name == "" || !args.Exists() { + return payload, false + } + if callID != "" && antigravityPayloadHasFunctionCallID(payload, callID) { + return payload, false + } + if frIndex, ok := antigravityFunctionResponseContentIndex(payload, callID); callID != "" && ok { + return insertAntigravityModelFunctionCallBeforeContent(payload, frIndex, name, callID, sig, args) + } + ci := antigravityReasoningReplayResolveContentIndex(payload, int(itemResult.Get("contentIndex").Int())) pi := int(itemResult.Get("partIndex").Int()) - sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String()) pathSig := fmt.Sprintf("request.contents.%d.parts.%d.thoughtSignature", ci, pi) out := payload changed := false @@ -309,11 +400,8 @@ func mergeAntigravityFunctionCallPartReplay(payload []byte, itemResult gjson.Res changed = true } } - name := strings.TrimSpace(itemResult.Get("name").String()) - args := itemResult.Get("args") - callID := strings.TrimSpace(itemResult.Get("call_id").String()) pathFC := fmt.Sprintf("request.contents.%d.parts.%d.functionCall", ci, pi) - if !gjson.GetBytes(out, pathFC).Exists() && name != "" && args.Exists() { + if !gjson.GetBytes(out, pathFC).Exists() { fc := map[string]any{"name": name} if callID != "" { fc["id"] = callID diff --git a/internal/runtime/executor/antigravity_reasoning_replay_test.go b/internal/runtime/executor/antigravity_reasoning_replay_test.go index 17d3f892a..da8b7d11a 100644 --- a/internal/runtime/executor/antigravity_reasoning_replay_test.go +++ b/internal/runtime/executor/antigravity_reasoning_replay_test.go @@ -62,11 +62,36 @@ func TestPrepareAntigravityGeminiReasoningReplayPayloadInjectsCachedToolPart(t * if !scope.valid() { t.Fatal("scope invalid") } - path := "request.contents.1.parts.0.thoughtSignature" - if got := gjson.GetBytes(out, path).String(); got != "sig-first" { - t.Fatalf("%s = %q, want sig-first; body=%s", path, got, string(out)) + if gjson.GetBytes(out, "request.contents.1.role").String() != "model" { + t.Fatalf("functionCall replay must be model role at [1], got %s", string(out)) + } + if got := gjson.GetBytes(out, "request.contents.1.parts.0.thoughtSignature").String(); got != "sig-first" { + t.Fatalf("thoughtSignature = %q, want sig-first", got) } if !gjson.GetBytes(out, "request.contents.1.parts.0.functionCall").Exists() { t.Fatalf("functionCall not injected: %s", string(out)) } + if !gjson.GetBytes(out, "request.contents.2.parts.0.functionResponse").Exists() { + t.Fatalf("functionResponse should follow model functionCall at [2]: %s", string(out)) + } +} + +func TestPrepareAntigravityGeminiReasoningReplayInsertsBeforeModelFunctionResponse(t *testing.T) { + internalcache.ClearAntigravityReasoningReplayCache() + t.Cleanup(internalcache.ClearAntigravityReasoningReplayCache) + + item := []byte(`{"type":"function_call_part","contentIndex":1,"partIndex":0,"name":"Read","call_id":"id1","args":{"file_path":"/a"},"thoughtSignature":"sig-first"}`) + internalcache.CacheAntigravityReasoningReplayItems("gemini-3-flash-agent", "session:sess-3", [][]byte{item}) + + payload := []byte(`{"sessionId":"sess-3","request":{"contents":[{"role":"user","parts":[{"text":"hi"}]},{"role":"model","parts":[{"functionResponse":{"id":"id1","name":"Read","response":{"result":"ok"}}}]}]}}`) + out, _, err := prepareAntigravityGeminiReasoningReplayPayload(context.Background(), "gemini-3-flash-agent", cliproxyexecutor.Request{}, cliproxyexecutor.Options{}, payload) + if err != nil { + t.Fatal(err) + } + if !gjson.GetBytes(out, "request.contents.1.parts.0.functionCall").Exists() || gjson.GetBytes(out, "request.contents.1.role").String() != "model" { + t.Fatalf("want model functionCall at [1]: %s", string(out)) + } + if !gjson.GetBytes(out, "request.contents.2.parts.0.functionResponse").Exists() { + t.Fatalf("functionResponse should be at [2]: %s", string(out)) + } } From ef19f5fc817c9f8722a5ae241e5b1c32f2797bd5 Mon Sep 17 00:00:00 2001 From: sususu98 Date: Thu, 18 Jun 2026 19:52:54 +0800 Subject: [PATCH 3/5] fix(antigravity): address review on replay call_id and args parsing Trim call_id once for matching-function-response checks; use args.Value() in synthetic model functionCall insertion; guard functionResponse lookup when call_id is empty. --- .../executor/antigravity_reasoning_replay.go | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/internal/runtime/executor/antigravity_reasoning_replay.go b/internal/runtime/executor/antigravity_reasoning_replay.go index e91c82e57..e764f9dfb 100644 --- a/internal/runtime/executor/antigravity_reasoning_replay.go +++ b/internal/runtime/executor/antigravity_reasoning_replay.go @@ -232,10 +232,11 @@ func antigravityAnyKeyExists(existing map[string]bool, keys []string) bool { } func antigravityRequestHasMatchingFunctionResponse(payload []byte, itemResult gjson.Result) bool { - _, ok := antigravityFunctionResponseContentIndex(payload, strings.TrimSpace(itemResult.Get("call_id").String())) - if itemResult.Get("call_id").String() == "" { + callID := strings.TrimSpace(itemResult.Get("call_id").String()) + if callID == "" { return true } + _, ok := antigravityFunctionResponseContentIndex(payload, callID) return ok } @@ -301,14 +302,7 @@ func insertAntigravityModelFunctionCallBeforeContent(payload []byte, beforeIndex fc["id"] = callID } if args.Exists() { - if args.Type == gjson.String { - fc["args"] = args.String() - } else { - var parsed any - if json.Unmarshal([]byte(args.Raw), &parsed) == nil { - fc["args"] = parsed - } - } + fc["args"] = args.Value() } part := map[string]any{"functionCall": fc} if thoughtSig != "" { @@ -385,8 +379,10 @@ func mergeAntigravityFunctionCallPartReplay(payload []byte, itemResult gjson.Res if callID != "" && antigravityPayloadHasFunctionCallID(payload, callID) { return payload, false } - if frIndex, ok := antigravityFunctionResponseContentIndex(payload, callID); callID != "" && ok { - return insertAntigravityModelFunctionCallBeforeContent(payload, frIndex, name, callID, sig, args) + if callID != "" { + if frIndex, ok := antigravityFunctionResponseContentIndex(payload, callID); ok { + return insertAntigravityModelFunctionCallBeforeContent(payload, frIndex, name, callID, sig, args) + } } ci := antigravityReasoningReplayResolveContentIndex(payload, int(itemResult.Get("contentIndex").Int())) From c55157dc2e2696e4a736d578435519818f6c48f6 Mon Sep 17 00:00:00 2001 From: sususu98 Date: Thu, 18 Jun 2026 20:29:44 +0800 Subject: [PATCH 4/5] fix(antigravity): PR review replay scope, signature merge, and tool keys - Derive replay session from generateStableSessionID when sessionId is absent - Merge cached thoughtSignature into existing functionCall by call id - Allow filter to pass function_call_part when only signature replay is needed - Include native functionCall id in replay dedupe keys - Add unit tests for signature merge, stable scope, and parallel tool ids --- .../executor/antigravity_reasoning_replay.go | 66 ++++++++++++++++--- .../antigravity_reasoning_replay_test.go | 42 ++++++++++++ 2 files changed, 98 insertions(+), 10 deletions(-) diff --git a/internal/runtime/executor/antigravity_reasoning_replay.go b/internal/runtime/executor/antigravity_reasoning_replay.go index e764f9dfb..79d2ccc1b 100644 --- a/internal/runtime/executor/antigravity_reasoning_replay.go +++ b/internal/runtime/executor/antigravity_reasoning_replay.go @@ -26,6 +26,14 @@ func (s antigravityReasoningReplayScope) valid() bool { func antigravityReasoningReplayScopeFromPayload(modelName string, payload []byte) antigravityReasoningReplayScope { sessionID := antigravityReplaySessionIDFromPayload(payload) + if sessionID == "" { + if stable := strings.TrimSpace(generateStableSessionID(payload)); stable != "" { + sessionID = strings.TrimPrefix(stable, "-") + if sessionID == "" { + sessionID = stable + } + } + } if sessionID == "" { return antigravityReasoningReplayScope{} } @@ -155,9 +163,14 @@ func filterAntigravityReasoningReplayItemsForRequest(payload []byte, items [][]b switch strings.TrimSpace(itemResult.Get("type").String()) { case "function_call_part": keys := antigravityReplayToolCallKeys(itemResult) - if len(keys) == 0 || antigravityAnyKeyExists(existing, keys) { + if len(keys) == 0 { continue } + if antigravityAnyKeyExists(existing, keys) { + if !antigravityNeedsSignatureReplayForExistingFunctionCall(payload, itemResult) { + continue + } + } if !antigravityRequestHasMatchingFunctionResponse(payload, itemResult) { continue } @@ -197,6 +210,9 @@ func antigravityExistingToolCallKeys(payload []byte) map[string]bool { func antigravityReplayToolCallKeys(itemResult gjson.Result) []string { callID := strings.TrimSpace(itemResult.Get("call_id").String()) + if callID == "" { + callID = strings.TrimSpace(itemResult.Get("id").String()) + } name := strings.TrimSpace(itemResult.Get("name").String()) if name == "" { return nil @@ -231,6 +247,23 @@ func antigravityAnyKeyExists(existing map[string]bool, keys []string) bool { return false } +func antigravityNeedsSignatureReplayForExistingFunctionCall(payload []byte, itemResult gjson.Result) bool { + callID := strings.TrimSpace(itemResult.Get("call_id").String()) + if callID == "" { + callID = strings.TrimSpace(itemResult.Get("id").String()) + } + sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String()) + if callID == "" || sig == "" { + return false + } + ci, pi, ok := antigravityFunctionCallPartLocation(payload, callID) + if !ok { + return false + } + pathSig := fmt.Sprintf("request.contents.%d.parts.%d.thoughtSignature", ci, pi) + return strings.TrimSpace(gjson.GetBytes(payload, pathSig).String()) == "" +} + func antigravityRequestHasMatchingFunctionResponse(payload []byte, itemResult gjson.Result) bool { callID := strings.TrimSpace(itemResult.Get("call_id").String()) if callID == "" { @@ -265,27 +298,32 @@ func antigravityFunctionResponseContentIndex(payload []byte, callID string) (int } func antigravityPayloadHasFunctionCallID(payload []byte, callID string) bool { + _, _, ok := antigravityFunctionCallPartLocation(payload, callID) + return ok +} + +func antigravityFunctionCallPartLocation(payload []byte, callID string) (contentIndex int, partIndex int, ok bool) { callID = strings.TrimSpace(callID) if callID == "" { - return false + return -1, -1, false } contents := gjson.GetBytes(payload, "request.contents") if !contents.IsArray() { - return false + return -1, -1, false } - for _, content := range contents.Array() { + for ci, content := range contents.Array() { parts := content.Get("parts") if !parts.IsArray() { continue } - for _, part := range parts.Array() { + for pi, part := range parts.Array() { fc := part.Get("functionCall") if fc.Exists() && strings.TrimSpace(fc.Get("id").String()) == callID { - return true + return ci, pi, true } } } - return false + return -1, -1, false } func insertAntigravityModelFunctionCallBeforeContent(payload []byte, beforeIndex int, name, callID, thoughtSig string, args gjson.Result) ([]byte, bool) { @@ -376,10 +414,18 @@ func mergeAntigravityFunctionCallPartReplay(payload []byte, itemResult gjson.Res if name == "" || !args.Exists() { return payload, false } - if callID != "" && antigravityPayloadHasFunctionCallID(payload, callID) { - return payload, false - } if callID != "" { + if ci, pi, exists := antigravityFunctionCallPartLocation(payload, callID); exists { + if sig != "" { + pathSig := fmt.Sprintf("request.contents.%d.parts.%d.thoughtSignature", ci, pi) + if strings.TrimSpace(gjson.GetBytes(payload, pathSig).String()) == "" { + if updated, err := sjson.SetBytes(payload, pathSig, sig); err == nil { + return updated, true + } + } + } + return payload, false + } if frIndex, ok := antigravityFunctionResponseContentIndex(payload, callID); ok { return insertAntigravityModelFunctionCallBeforeContent(payload, frIndex, name, callID, sig, args) } diff --git a/internal/runtime/executor/antigravity_reasoning_replay_test.go b/internal/runtime/executor/antigravity_reasoning_replay_test.go index da8b7d11a..688dd7ad9 100644 --- a/internal/runtime/executor/antigravity_reasoning_replay_test.go +++ b/internal/runtime/executor/antigravity_reasoning_replay_test.go @@ -2,6 +2,7 @@ package executor import ( "context" + "strings" "testing" internalcache "github.com/router-for-me/CLIProxyAPI/v7/internal/cache" @@ -95,3 +96,44 @@ func TestPrepareAntigravityGeminiReasoningReplayInsertsBeforeModelFunctionRespon t.Fatalf("functionResponse should be at [2]: %s", string(out)) } } + +func TestMergeAntigravityFunctionCallPartReplayMergesSignatureIntoExistingFunctionCall(t *testing.T) { + internalcache.ClearAntigravityReasoningReplayCache() + t.Cleanup(internalcache.ClearAntigravityReasoningReplayCache) + + item := []byte(`{"type":"function_call_part","contentIndex":1,"partIndex":0,"name":"Read","call_id":"id1","args":{"file_path":"/a"},"thoughtSignature":"sig-first"}`) + internalcache.CacheAntigravityReasoningReplayItems("gemini-3-flash-agent", "session:sess-merge", [][]byte{item}) + + payload := []byte(`{"sessionId":"sess-merge","request":{"contents":[{"role":"user","parts":[{"text":"hi"}]},{"role":"model","parts":[{"functionCall":{"id":"id1","name":"Read","args":{"file_path":"/a"}}}]},{"role":"user","parts":[{"functionResponse":{"id":"id1","name":"Read","response":{"result":"ok"}}}]}]}}`) + out, _, err := prepareAntigravityGeminiReasoningReplayPayload(context.Background(), "gemini-3-flash-agent", cliproxyexecutor.Request{}, cliproxyexecutor.Options{}, payload) + if err != nil { + t.Fatal(err) + } + if got := gjson.GetBytes(out, "request.contents.1.parts.0.thoughtSignature").String(); got != "sig-first" { + t.Fatalf("thoughtSignature = %q, want sig-first; body=%s", got, out) + } +} + +func TestAntigravityReasoningReplayScopeUsesStableSessionWithoutSessionId(t *testing.T) { + payload := []byte(`{"request":{"contents":[{"role":"user","parts":[{"text":"stable-user-text"}]}]}}`) + scope := antigravityReasoningReplayScopeFromPayload("gemini-3-flash-agent", payload) + if !scope.valid() { + t.Fatal("scope should be valid from stable session hash") + } + if !strings.HasPrefix(scope.sessionKey, "session:") { + t.Fatalf("sessionKey = %q", scope.sessionKey) + } +} + +func TestAntigravityReplayToolCallKeysUsesNativeFunctionCallID(t *testing.T) { + fc := gjson.Parse(`{"name":"Read","args":{"file_path":"/a"},"id":"id-native"}`) + keys := antigravityReplayToolCallKeysFromPart(fc) + if len(keys) != 1 { + t.Fatalf("keys = %v", keys) + } + fc2 := gjson.Parse(`{"name":"Read","args":{"file_path":"/a"},"id":"id-native-2"}`) + keys2 := antigravityReplayToolCallKeysFromPart(fc2) + if keys[0] == keys2[0] { + t.Fatalf("parallel tool calls should not share replay key: %v vs %v", keys, keys2) + } +} From ec8c2c29135c1aa0f0ae044b58cb51369860bbaf Mon Sep 17 00:00:00 2001 From: sususu98 Date: Thu, 18 Jun 2026 22:31:50 +0800 Subject: [PATCH 5/5] test(antigravity): cover invalid-signature replay cache clear Add executor httptest for upstream 400 clearing HOME reasoning replay items, and a whitespace call_id matcher regression test for replay filtering. --- ...antigravity_reasoning_replay_clear_test.go | 66 +++++++++++++++++++ .../antigravity_reasoning_replay_test.go | 7 ++ 2 files changed, 73 insertions(+) create mode 100644 internal/runtime/executor/antigravity_reasoning_replay_clear_test.go diff --git a/internal/runtime/executor/antigravity_reasoning_replay_clear_test.go b/internal/runtime/executor/antigravity_reasoning_replay_clear_test.go new file mode 100644 index 000000000..a15f15ece --- /dev/null +++ b/internal/runtime/executor/antigravity_reasoning_replay_clear_test.go @@ -0,0 +1,66 @@ +package executor + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + internalcache "github.com/router-for-me/CLIProxyAPI/v7/internal/cache" + "github.com/router-for-me/CLIProxyAPI/v7/internal/config" + cliproxyauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/executor" + sdktranslator "github.com/router-for-me/CLIProxyAPI/v7/sdk/translator" +) + +func TestAntigravityReasoningReplayClearsOnInvalidSignature400(t *testing.T) { + internalcache.ClearAntigravityReasoningReplayCache() + t.Cleanup(internalcache.ClearAntigravityReasoningReplayCache) + + model := "gemini-3-flash-agent" + sessionKey := "session:pr3900-invalid-sig" + bad := []byte(`{"type":"thought_signature","thoughtSignature":"INVALID_REPLAY_SIGNATURE_PR3900_XXXXXXXXX","contentIndex":1,"partIndex":0}`) + if !internalcache.CacheAntigravityReasoningReplayItems(model, sessionKey, [][]byte{bad}) { + t.Fatal("failed to seed replay cache") + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = io.ReadAll(r.Body) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":{"message":"Invalid thoughtSignature in model content","code":400}}`)) + })) + defer server.Close() + + exec := NewAntigravityExecutor(&config.Config{RequestRetry: 1}) + auth := &cliproxyauth.Auth{ + ID: "auth-pr3900-invalid-sig", + Attributes: map[string]string{ + "base_url": server.URL, + }, + Metadata: map[string]any{ + "access_token": "token", + "project_id": "project-1", + "expired": time.Now().Add(1 * time.Hour).Format(time.RFC3339), + }, + } + + payload := []byte(`{"sessionId":"pr3900-invalid-sig","request":{"contents":[{"role":"user","parts":[{"text":"hi"}]},{"role":"user","parts":[{"functionResponse":{"id":"id1","name":"Bash","response":{"result":"ok"}}}]}]}}`) + _, err := exec.Execute(context.Background(), auth, cliproxyexecutor.Request{ + Model: model, + Payload: payload, + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FormatAntigravity, + Stream: false, + }) + if err == nil { + t.Fatal("expected upstream 400 error") + } + if _, ok, errGet := internalcache.GetAntigravityReasoningReplayItemsRequired(context.Background(), model, sessionKey); errGet != nil { + t.Fatalf("get after clear: %v", errGet) + } else if ok { + t.Fatal("invalid signature 400 should clear cached replay item") + } +} diff --git a/internal/runtime/executor/antigravity_reasoning_replay_test.go b/internal/runtime/executor/antigravity_reasoning_replay_test.go index 688dd7ad9..cc53da279 100644 --- a/internal/runtime/executor/antigravity_reasoning_replay_test.go +++ b/internal/runtime/executor/antigravity_reasoning_replay_test.go @@ -137,3 +137,10 @@ func TestAntigravityReplayToolCallKeysUsesNativeFunctionCallID(t *testing.T) { t.Fatalf("parallel tool calls should not share replay key: %v vs %v", keys, keys2) } } + +func TestAntigravityRequestHasMatchingFunctionResponseWhitespaceCallID(t *testing.T) { + item := gjson.Parse(`{"call_id":" "}`) + if !antigravityRequestHasMatchingFunctionResponse(nil, item) { + t.Fatal("whitespace-only call_id should be treated as empty => true") + } +}