mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-06-24 17:57:46 +08:00
Merge pull request #3900 from sususu98/fix/antigravity-replay-fc-order-upstream-dev
fix(antigravity): HOME Gemini replay and functionCall ordering before functionResponse
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -25,6 +25,7 @@ static/*
|
||||
|
||||
# Authentication data
|
||||
auths/*
|
||||
/auths
|
||||
!auths/.gitkeep
|
||||
|
||||
# Documentation
|
||||
@@ -38,6 +39,7 @@ GEMINI.md
|
||||
.worktrees/
|
||||
.codex/*
|
||||
.claude/*
|
||||
.claude
|
||||
.gemini/*
|
||||
.serena/*
|
||||
.agent/*
|
||||
|
||||
347
internal/cache/antigravity_reasoning_replay_cache.go
vendored
Normal file
347
internal/cache/antigravity_reasoning_replay_cache.go
vendored
Normal file
@@ -0,0 +1,347 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
homekv "github.com/router-for-me/CLIProxyAPI/v7/internal/home"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tidwall/gjson"
|
||||
"github.com/tidwall/sjson"
|
||||
)
|
||||
|
||||
const (
|
||||
// AntigravityReasoningReplayCacheTTL limits how long encrypted reasoning replay
|
||||
// items stay in process memory.
|
||||
AntigravityReasoningReplayCacheTTL = 1 * time.Hour
|
||||
|
||||
// AntigravityReasoningReplayCacheMaxEntries bounds process memory for replay
|
||||
// continuity. Oldest entries are evicted first.
|
||||
AntigravityReasoningReplayCacheMaxEntries = 10240
|
||||
|
||||
// AntigravityReasoningReplayCacheEvictBatchSize leaves headroom after the cache
|
||||
// reaches capacity so high write volume does not rescan the map every turn.
|
||||
AntigravityReasoningReplayCacheEvictBatchSize = 128
|
||||
|
||||
minAntigravityThoughtSignatureReplayLen = 16
|
||||
)
|
||||
|
||||
type antigravityReasoningReplayEntry struct {
|
||||
Items [][]byte
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
var (
|
||||
antigravityReasoningReplayMu sync.Mutex
|
||||
antigravityReasoningReplayEntries = make(map[string]antigravityReasoningReplayEntry)
|
||||
)
|
||||
|
||||
type antigravityReasoningReplayKVClient interface {
|
||||
KVGet(ctx context.Context, key string) ([]byte, bool, error)
|
||||
KVSet(ctx context.Context, key string, value []byte, opts homekv.KVSetOptions) (bool, error)
|
||||
KVDel(ctx context.Context, keys ...string) (int64, error)
|
||||
KVExpire(ctx context.Context, key string, ttl time.Duration) (bool, error)
|
||||
}
|
||||
|
||||
var currentAntigravityReasoningReplayKVClient = func() (antigravityReasoningReplayKVClient, bool, error) {
|
||||
return homekv.CurrentKVClient()
|
||||
}
|
||||
|
||||
// CacheAntigravityReasoningReplayItem stores a final GPT/Codex reasoning item for
|
||||
// stateless replay. The stored item is normalized to the minimal shape accepted
|
||||
// by Responses input replay.
|
||||
func CacheAntigravityReasoningReplayItem(modelName, sessionKey string, item []byte) bool {
|
||||
return CacheAntigravityReasoningReplayItems(modelName, sessionKey, [][]byte{item})
|
||||
}
|
||||
|
||||
// CacheAntigravityReasoningReplayItems stores the final GPT/Codex assistant output
|
||||
// items needed to replay a stateless next turn.
|
||||
func CacheAntigravityReasoningReplayItems(modelName, sessionKey string, items [][]byte) bool {
|
||||
return CacheAntigravityReasoningReplayItemsBestEffort(context.Background(), modelName, sessionKey, items)
|
||||
}
|
||||
|
||||
// CacheAntigravityReasoningReplayItemsBestEffort stores replay items for completed response paths.
|
||||
func CacheAntigravityReasoningReplayItemsBestEffort(ctx context.Context, modelName, sessionKey string, items [][]byte) bool {
|
||||
key := antigravityReasoningReplayCacheKey(modelName, sessionKey)
|
||||
if key == "" {
|
||||
return false
|
||||
}
|
||||
normalized, ok := normalizeAntigravityReasoningReplayItems(items)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if client, homeMode, errClient := currentAntigravityReasoningReplayKVClient(); homeMode {
|
||||
if errClient != nil {
|
||||
log.Errorf("home kv best-effort antigravity reasoning replay set failed prefix=cpa:antigravity:*: %v", errClient)
|
||||
return false
|
||||
}
|
||||
raw, errMarshal := json.Marshal(normalized)
|
||||
if errMarshal != nil {
|
||||
log.Errorf("home kv best-effort antigravity reasoning replay set failed prefix=cpa:antigravity:*: %v", errMarshal)
|
||||
return false
|
||||
}
|
||||
written, errSet := client.KVSet(ctx, antigravityReasoningReplayKVKey(modelName, sessionKey), raw, homekv.KVSetOptions{EX: AntigravityReasoningReplayCacheTTL})
|
||||
if errSet != nil {
|
||||
log.Errorf("home kv best-effort antigravity reasoning replay set failed prefix=cpa:antigravity:*: %v", errSet)
|
||||
return false
|
||||
}
|
||||
return written
|
||||
}
|
||||
|
||||
cacheCleanupOnce.Do(startCacheCleanup)
|
||||
now := time.Now()
|
||||
antigravityReasoningReplayMu.Lock()
|
||||
defer antigravityReasoningReplayMu.Unlock()
|
||||
antigravityReasoningReplayEntries[key] = antigravityReasoningReplayEntry{
|
||||
Items: normalized,
|
||||
Timestamp: now,
|
||||
}
|
||||
if len(antigravityReasoningReplayEntries) > AntigravityReasoningReplayCacheMaxEntries {
|
||||
evictOldestAntigravityReasoningReplayEntries(AntigravityReasoningReplayCacheEvictBatchSize)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// GetAntigravityReasoningReplayItem retrieves a normalized reasoning replay item.
|
||||
func GetAntigravityReasoningReplayItem(modelName, sessionKey string) ([]byte, bool) {
|
||||
items, ok := GetAntigravityReasoningReplayItems(modelName, sessionKey)
|
||||
if !ok || len(items) == 0 {
|
||||
return nil, false
|
||||
}
|
||||
return items[0], true
|
||||
}
|
||||
|
||||
// GetAntigravityReasoningReplayItems retrieves normalized assistant output items.
|
||||
func GetAntigravityReasoningReplayItems(modelName, sessionKey string) ([][]byte, bool) {
|
||||
items, ok, err := GetAntigravityReasoningReplayItemsRequired(context.Background(), modelName, sessionKey)
|
||||
if err == nil {
|
||||
return items, ok
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// GetAntigravityReasoningReplayItemsRequired retrieves replay items for request-time paths.
|
||||
func GetAntigravityReasoningReplayItemsRequired(ctx context.Context, modelName, sessionKey string) ([][]byte, bool, error) {
|
||||
key := antigravityReasoningReplayCacheKey(modelName, sessionKey)
|
||||
if key == "" {
|
||||
return nil, false, nil
|
||||
}
|
||||
client, homeMode, errClient := currentAntigravityReasoningReplayKVClient()
|
||||
if homeMode {
|
||||
if errClient != nil {
|
||||
return nil, false, errClient
|
||||
}
|
||||
raw, found, errGet := client.KVGet(ctx, antigravityReasoningReplayKVKey(modelName, sessionKey))
|
||||
if errGet != nil || !found {
|
||||
return nil, false, errGet
|
||||
}
|
||||
var homeItems [][]byte
|
||||
if errUnmarshal := json.Unmarshal(raw, &homeItems); errUnmarshal != nil {
|
||||
return nil, false, errUnmarshal
|
||||
}
|
||||
if _, errExpire := client.KVExpire(ctx, antigravityReasoningReplayKVKey(modelName, sessionKey), AntigravityReasoningReplayCacheTTL); errExpire != nil {
|
||||
return nil, false, errExpire
|
||||
}
|
||||
return cloneAntigravityReasoningReplayItems(homeItems), true, nil
|
||||
}
|
||||
|
||||
cacheCleanupOnce.Do(startCacheCleanup)
|
||||
now := time.Now()
|
||||
antigravityReasoningReplayMu.Lock()
|
||||
defer antigravityReasoningReplayMu.Unlock()
|
||||
entry, ok := antigravityReasoningReplayEntries[key]
|
||||
if !ok {
|
||||
return nil, false, nil
|
||||
}
|
||||
if now.Sub(entry.Timestamp) > AntigravityReasoningReplayCacheTTL {
|
||||
delete(antigravityReasoningReplayEntries, key)
|
||||
return nil, false, nil
|
||||
}
|
||||
entry.Timestamp = now
|
||||
antigravityReasoningReplayEntries[key] = entry
|
||||
return cloneAntigravityReasoningReplayItems(entry.Items), true, nil
|
||||
}
|
||||
|
||||
// DeleteAntigravityReasoningReplayItem removes one replay item after upstream rejects
|
||||
// it or the caller otherwise knows it is stale.
|
||||
func DeleteAntigravityReasoningReplayItem(modelName, sessionKey string) {
|
||||
if errDelete := DeleteAntigravityReasoningReplayItemRequired(context.Background(), modelName, sessionKey); errDelete != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteAntigravityReasoningReplayItemRequired removes one replay item for request-time paths.
|
||||
func DeleteAntigravityReasoningReplayItemRequired(ctx context.Context, modelName, sessionKey string) error {
|
||||
key := antigravityReasoningReplayCacheKey(modelName, sessionKey)
|
||||
if key == "" {
|
||||
return nil
|
||||
}
|
||||
client, homeMode, errClient := currentAntigravityReasoningReplayKVClient()
|
||||
if homeMode {
|
||||
if errClient != nil {
|
||||
return errClient
|
||||
}
|
||||
_, errDel := client.KVDel(ctx, antigravityReasoningReplayKVKey(modelName, sessionKey))
|
||||
return errDel
|
||||
}
|
||||
antigravityReasoningReplayMu.Lock()
|
||||
delete(antigravityReasoningReplayEntries, key)
|
||||
antigravityReasoningReplayMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// ClearAntigravityReasoningReplayCache clears all Antigravity reasoning replay state.
|
||||
func ClearAntigravityReasoningReplayCache() {
|
||||
antigravityReasoningReplayMu.Lock()
|
||||
antigravityReasoningReplayEntries = make(map[string]antigravityReasoningReplayEntry)
|
||||
antigravityReasoningReplayMu.Unlock()
|
||||
}
|
||||
|
||||
func antigravityReasoningReplayCacheKey(modelName, sessionKey string) string {
|
||||
modelName = strings.TrimSpace(modelName)
|
||||
sessionKey = strings.TrimSpace(sessionKey)
|
||||
if modelName == "" || sessionKey == "" {
|
||||
return ""
|
||||
}
|
||||
// The session key is the continuity boundary. Keep this independent from
|
||||
// the selected upstream Codex credential so auth failover can preserve replay.
|
||||
return strings.Join([]string{"antigravity-reasoning-replay", modelName, sessionKey}, "\x00")
|
||||
}
|
||||
|
||||
func antigravityReasoningReplayKVKey(modelName, sessionKey string) string {
|
||||
return "cpa:antigravity:reasoning-replay:" + homekv.HashKeyPart(strings.TrimSpace(modelName)) + ":" + homekv.HashKeyPart(strings.TrimSpace(sessionKey))
|
||||
}
|
||||
|
||||
func normalizeAntigravityReasoningReplayItems(items [][]byte) ([][]byte, bool) {
|
||||
normalized := make([][]byte, 0, len(items))
|
||||
for _, item := range items {
|
||||
normalizedItem, ok := normalizeAntigravityReasoningReplayItem(item)
|
||||
if ok {
|
||||
normalized = append(normalized, normalizedItem)
|
||||
}
|
||||
}
|
||||
return normalized, len(normalized) > 0
|
||||
}
|
||||
|
||||
func normalizeAntigravityReasoningReplayItem(item []byte) ([]byte, bool) {
|
||||
itemResult := gjson.ParseBytes(item)
|
||||
switch strings.TrimSpace(itemResult.Get("type").String()) {
|
||||
case "thought_signature":
|
||||
return normalizeAntigravityThoughtSignatureReplayItem(itemResult)
|
||||
case "function_call_part":
|
||||
return normalizeAntigravityFunctionCallPartReplayItem(itemResult)
|
||||
default:
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
|
||||
func normalizeAntigravityThoughtSignatureReplayItem(itemResult gjson.Result) ([]byte, bool) {
|
||||
sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String())
|
||||
if sig == "" {
|
||||
sig = strings.TrimSpace(itemResult.Get("thought_signature").String())
|
||||
}
|
||||
if sig == "" || len(sig) < minAntigravityThoughtSignatureReplayLen {
|
||||
return nil, false
|
||||
}
|
||||
normalized := []byte(`{"type":"thought_signature"}`)
|
||||
normalized, _ = sjson.SetBytes(normalized, "thoughtSignature", sig)
|
||||
if contentIndex := itemResult.Get("contentIndex"); contentIndex.Type == gjson.Number {
|
||||
normalized, _ = sjson.SetBytes(normalized, "contentIndex", contentIndex.Int())
|
||||
}
|
||||
if partIndex := itemResult.Get("partIndex"); partIndex.Type == gjson.Number {
|
||||
normalized, _ = sjson.SetBytes(normalized, "partIndex", partIndex.Int())
|
||||
}
|
||||
return normalized, true
|
||||
}
|
||||
|
||||
func normalizeAntigravityFunctionCallPartReplayItem(itemResult gjson.Result) ([]byte, bool) {
|
||||
callID := strings.TrimSpace(itemResult.Get("call_id").String())
|
||||
if callID == "" {
|
||||
callID = strings.TrimSpace(itemResult.Get("id").String())
|
||||
}
|
||||
name := strings.TrimSpace(itemResult.Get("name").String())
|
||||
args := itemResult.Get("args")
|
||||
if name == "" || !args.Exists() {
|
||||
fc := itemResult.Get("functionCall")
|
||||
if fc.Exists() {
|
||||
if callID == "" {
|
||||
callID = strings.TrimSpace(fc.Get("id").String())
|
||||
}
|
||||
if name == "" {
|
||||
name = strings.TrimSpace(fc.Get("name").String())
|
||||
}
|
||||
if !args.Exists() {
|
||||
args = fc.Get("args")
|
||||
}
|
||||
}
|
||||
}
|
||||
if name == "" || !args.Exists() {
|
||||
return nil, false
|
||||
}
|
||||
normalized := []byte(`{"type":"function_call_part"}`)
|
||||
if callID != "" {
|
||||
normalized, _ = sjson.SetBytes(normalized, "call_id", callID)
|
||||
}
|
||||
normalized, _ = sjson.SetBytes(normalized, "name", name)
|
||||
if args.Type == gjson.String {
|
||||
normalized, _ = sjson.SetBytes(normalized, "args", args.String())
|
||||
} else {
|
||||
normalized, _ = sjson.SetRawBytes(normalized, "args", []byte(args.Raw))
|
||||
}
|
||||
sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String())
|
||||
if sig != "" {
|
||||
normalized, _ = sjson.SetBytes(normalized, "thoughtSignature", sig)
|
||||
}
|
||||
if contentIndex := itemResult.Get("contentIndex"); contentIndex.Type == gjson.Number {
|
||||
normalized, _ = sjson.SetBytes(normalized, "contentIndex", contentIndex.Int())
|
||||
}
|
||||
if partIndex := itemResult.Get("partIndex"); partIndex.Type == gjson.Number {
|
||||
normalized, _ = sjson.SetBytes(normalized, "partIndex", partIndex.Int())
|
||||
}
|
||||
return normalized, true
|
||||
}
|
||||
|
||||
func cloneAntigravityReasoningReplayItems(items [][]byte) [][]byte {
|
||||
cloned := make([][]byte, 0, len(items))
|
||||
for _, item := range items {
|
||||
cloned = append(cloned, append([]byte(nil), item...))
|
||||
}
|
||||
return cloned
|
||||
}
|
||||
|
||||
func evictOldestAntigravityReasoningReplayEntries(count int) {
|
||||
if count <= 0 || len(antigravityReasoningReplayEntries) == 0 {
|
||||
return
|
||||
}
|
||||
type candidate struct {
|
||||
key string
|
||||
timestamp time.Time
|
||||
}
|
||||
candidates := make([]candidate, 0, len(antigravityReasoningReplayEntries))
|
||||
for key, entry := range antigravityReasoningReplayEntries {
|
||||
candidates = append(candidates, candidate{key: key, timestamp: entry.Timestamp})
|
||||
}
|
||||
sort.Slice(candidates, func(i, j int) bool {
|
||||
return candidates[i].timestamp.Before(candidates[j].timestamp)
|
||||
})
|
||||
if count > len(candidates) {
|
||||
count = len(candidates)
|
||||
}
|
||||
for i := 0; i < count; i++ {
|
||||
delete(antigravityReasoningReplayEntries, candidates[i].key)
|
||||
}
|
||||
}
|
||||
|
||||
func purgeExpiredAntigravityReasoningReplayCache(now time.Time) {
|
||||
antigravityReasoningReplayMu.Lock()
|
||||
for key, entry := range antigravityReasoningReplayEntries {
|
||||
if now.Sub(entry.Timestamp) > AntigravityReasoningReplayCacheTTL {
|
||||
delete(antigravityReasoningReplayEntries, key)
|
||||
}
|
||||
}
|
||||
antigravityReasoningReplayMu.Unlock()
|
||||
}
|
||||
1
internal/cache/signature_cache.go
vendored
1
internal/cache/signature_cache.go
vendored
@@ -109,6 +109,7 @@ func purgeExpiredCaches() {
|
||||
return true
|
||||
})
|
||||
purgeExpiredCodexReasoningReplayCache(now)
|
||||
purgeExpiredAntigravityReasoningReplayCache(now)
|
||||
}
|
||||
|
||||
// CacheSignature stores a thinking signature for a given model group and text.
|
||||
|
||||
@@ -306,9 +306,6 @@ func validateAntigravityRequestSignatures(ctx context.Context, modelName string,
|
||||
rawJSON = antigravityclaude.StripEmptySignatureThinkingBlocks(rawJSON)
|
||||
logAntigravitySignatureStrip(before, countClaudeThinkingBlocks(rawJSON), "prefix_cleanup", "empty_or_non_claude_signature")
|
||||
if cache.SignatureCacheEnabled() {
|
||||
if errRequire := antigravityclaude.RequireCachedThinkingSignatures(ctx, modelName, rawJSON); errRequire != nil {
|
||||
return nil, homeKVUnavailableStatusErr(errRequire)
|
||||
}
|
||||
return rawJSON, nil
|
||||
}
|
||||
if !cache.SignatureBypassStrictMode() {
|
||||
@@ -691,6 +688,15 @@ attemptLoop:
|
||||
helps.MarkCreditsUsed(ctx)
|
||||
}
|
||||
}
|
||||
replayScope := antigravityReasoningReplayScope{}
|
||||
if antigravityUsesReasoningReplayCache(baseModel) {
|
||||
var errReplay error
|
||||
requestPayload, replayScope, errReplay = prepareAntigravityGeminiReasoningReplayPayload(ctx, baseModel, req, opts, requestPayload)
|
||||
if errReplay != nil {
|
||||
err = errReplay
|
||||
return resp, err
|
||||
}
|
||||
}
|
||||
|
||||
httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, requestPayload, false, opts.Alt, baseURL)
|
||||
if errReq != nil {
|
||||
@@ -798,6 +804,10 @@ attemptLoop:
|
||||
continue attemptLoop
|
||||
}
|
||||
}
|
||||
if errClear := clearAntigravityReasoningReplayOnInvalidSignature(ctx, replayScope, httpResp.StatusCode, bodyBytes); errClear != nil {
|
||||
err = errClear
|
||||
return resp, err
|
||||
}
|
||||
err = newAntigravityStatusErr(httpResp.StatusCode, bodyBytes)
|
||||
return resp, err
|
||||
}
|
||||
@@ -806,6 +816,7 @@ attemptLoop:
|
||||
if useCredits {
|
||||
clearAntigravityCreditsFailureState(auth)
|
||||
}
|
||||
cacheAntigravityReasoningReplayFromResponse(ctx, replayScope, requestPayload, bodyBytes)
|
||||
bodyBytes = e.resolveWebSearchGroundingURLs(ctx, auth, from, originalPayload, translated, bodyBytes)
|
||||
reporter.Publish(ctx, helps.ParseAntigravityUsage(bodyBytes))
|
||||
var param any
|
||||
@@ -1369,6 +1380,15 @@ attemptLoop:
|
||||
helps.MarkCreditsUsed(ctx)
|
||||
}
|
||||
}
|
||||
replayScope := antigravityReasoningReplayScope{}
|
||||
if antigravityUsesReasoningReplayCache(baseModel) {
|
||||
var errReplay error
|
||||
requestPayload, replayScope, errReplay = prepareAntigravityGeminiReasoningReplayPayload(ctx, baseModel, req, opts, requestPayload)
|
||||
if errReplay != nil {
|
||||
err = errReplay
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, requestPayload, true, opts.Alt, baseURL)
|
||||
if errReq != nil {
|
||||
err = errReq
|
||||
@@ -1487,6 +1507,10 @@ attemptLoop:
|
||||
continue attemptLoop
|
||||
}
|
||||
}
|
||||
if errClear := clearAntigravityReasoningReplayOnInvalidSignature(ctx, replayScope, httpResp.StatusCode, bodyBytes); errClear != nil {
|
||||
err = errClear
|
||||
return nil, err
|
||||
}
|
||||
err = newAntigravityStatusErr(httpResp.StatusCode, bodyBytes)
|
||||
return nil, err
|
||||
}
|
||||
@@ -1495,12 +1519,16 @@ attemptLoop:
|
||||
if useCredits {
|
||||
clearAntigravityCreditsFailureState(auth)
|
||||
}
|
||||
replayAccumulator := newAntigravityReasoningReplayAccumulator(replayScope, requestPayload)
|
||||
out := make(chan cliproxyexecutor.StreamChunk)
|
||||
go func(resp *http.Response) {
|
||||
defer close(out)
|
||||
defer func() {
|
||||
if replayAccumulator != nil {
|
||||
replayAccumulator.Flush(ctx)
|
||||
}
|
||||
if errClose := resp.Body.Close(); errClose != nil {
|
||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||
log.Errorf("antigravity executor: close response line error: %v", errClose)
|
||||
}
|
||||
}()
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
@@ -1509,6 +1537,9 @@ attemptLoop:
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
helps.AppendAPIResponseChunk(ctx, e.cfg, line)
|
||||
if replayAccumulator != nil {
|
||||
replayAccumulator.ObserveSSELine(line)
|
||||
}
|
||||
|
||||
// Filter usage metadata for all models
|
||||
// Only retain usage statistics in the terminal chunk
|
||||
@@ -2229,9 +2260,10 @@ func (e *AntigravityExecutor) buildRequest(ctx context.Context, auth *cliproxyau
|
||||
payloadStr, _ = sjson.Delete(payloadStr, "request.generationConfig.maxOutputTokens")
|
||||
}
|
||||
|
||||
bodyReader = strings.NewReader(payloadStr)
|
||||
payloadStrBytes := applyAntigravityNativeSignatureReplayIfNeeded(modelName, []byte(payloadStr))
|
||||
bodyReader = bytes.NewReader(payloadStrBytes)
|
||||
if e.cfg != nil && e.cfg.RequestLog {
|
||||
payloadLog = []byte(payloadStr)
|
||||
payloadLog = append([]byte(nil), payloadStrBytes...)
|
||||
}
|
||||
} else {
|
||||
if strings.Contains(modelName, "claude") {
|
||||
@@ -2240,6 +2272,7 @@ func (e *AntigravityExecutor) buildRequest(ctx context.Context, auth *cliproxyau
|
||||
payload, _ = sjson.DeleteBytes(payload, "request.generationConfig.maxOutputTokens")
|
||||
}
|
||||
|
||||
payload = applyAntigravityNativeSignatureReplayIfNeeded(modelName, payload)
|
||||
bodyReader = bytes.NewReader(payload)
|
||||
if e.cfg != nil && e.cfg.RequestLog {
|
||||
payloadLog = append([]byte(nil), payload...)
|
||||
|
||||
607
internal/runtime/executor/antigravity_reasoning_replay.go
Normal file
607
internal/runtime/executor/antigravity_reasoning_replay.go
Normal file
@@ -0,0 +1,607 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
internalcache "github.com/router-for-me/CLIProxyAPI/v7/internal/cache"
|
||||
"github.com/router-for-me/CLIProxyAPI/v7/internal/runtime/executor/helps"
|
||||
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/executor"
|
||||
"github.com/tidwall/gjson"
|
||||
"github.com/tidwall/sjson"
|
||||
)
|
||||
|
||||
type antigravityReasoningReplayScope struct {
|
||||
modelName string
|
||||
sessionKey string
|
||||
}
|
||||
|
||||
func (s antigravityReasoningReplayScope) valid() bool {
|
||||
return strings.TrimSpace(s.modelName) != "" && strings.TrimSpace(s.sessionKey) != ""
|
||||
}
|
||||
|
||||
func antigravityReasoningReplayScopeFromPayload(modelName string, payload []byte) antigravityReasoningReplayScope {
|
||||
sessionID := antigravityReplaySessionIDFromPayload(payload)
|
||||
if sessionID == "" {
|
||||
if stable := strings.TrimSpace(generateStableSessionID(payload)); stable != "" {
|
||||
sessionID = strings.TrimPrefix(stable, "-")
|
||||
if sessionID == "" {
|
||||
sessionID = stable
|
||||
}
|
||||
}
|
||||
}
|
||||
if sessionID == "" {
|
||||
return antigravityReasoningReplayScope{}
|
||||
}
|
||||
return antigravityReasoningReplayScope{
|
||||
modelName: strings.TrimSpace(modelName),
|
||||
sessionKey: "session:" + sessionID,
|
||||
}
|
||||
}
|
||||
|
||||
func antigravityReasoningReplayScopeFromRequest(ctx context.Context, modelName string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, payload []byte) antigravityReasoningReplayScope {
|
||||
if scope := antigravityReasoningReplayScopeFromPayload(modelName, payload); scope.valid() {
|
||||
return scope
|
||||
}
|
||||
if scope := antigravityReasoningReplayScopeFromPayload(modelName, req.Payload); scope.valid() {
|
||||
return scope
|
||||
}
|
||||
if value := metadataString(opts.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); value != "" {
|
||||
return antigravityReasoningReplayScope{modelName: modelName, sessionKey: "execution:" + value}
|
||||
}
|
||||
if value := metadataString(req.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); value != "" {
|
||||
return antigravityReasoningReplayScope{modelName: modelName, sessionKey: "execution:" + value}
|
||||
}
|
||||
_ = ctx
|
||||
return antigravityReasoningReplayScope{}
|
||||
}
|
||||
|
||||
func antigravityReplaySessionIDFromPayload(payload []byte) string {
|
||||
if len(payload) == 0 {
|
||||
return ""
|
||||
}
|
||||
for _, path := range []string{"sessionId", "session_id", "request.sessionId", "request.session_id"} {
|
||||
if id := strings.TrimSpace(gjson.GetBytes(payload, path).String()); id != "" {
|
||||
return id
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func antigravityReasoningReplayPendingModelContentIndex(payload []byte) (contentIndex int, basePartIndex int) {
|
||||
contents := gjson.GetBytes(payload, "request.contents")
|
||||
if !contents.IsArray() {
|
||||
return 0, 0
|
||||
}
|
||||
arr := contents.Array()
|
||||
if len(arr) == 0 {
|
||||
return 0, 0
|
||||
}
|
||||
last := arr[len(arr)-1]
|
||||
if strings.EqualFold(strings.TrimSpace(last.Get("role").String()), "model") {
|
||||
ci := len(arr) - 1
|
||||
parts := last.Get("parts")
|
||||
base := 0
|
||||
if parts.IsArray() {
|
||||
base = len(parts.Array())
|
||||
}
|
||||
return ci, base
|
||||
}
|
||||
return len(arr), 0
|
||||
}
|
||||
|
||||
func antigravityReasoningReplayResolveContentIndex(payload []byte, cached int) int {
|
||||
contents := gjson.GetBytes(payload, "request.contents")
|
||||
if !contents.IsArray() {
|
||||
return cached
|
||||
}
|
||||
arr := contents.Array()
|
||||
if cached >= 0 && cached < len(arr) {
|
||||
return cached
|
||||
}
|
||||
for i := len(arr) - 1; i >= 0; i-- {
|
||||
if strings.EqualFold(strings.TrimSpace(arr[i].Get("role").String()), "model") {
|
||||
return i
|
||||
}
|
||||
}
|
||||
if len(arr) == 0 {
|
||||
return 0
|
||||
}
|
||||
return len(arr) - 1
|
||||
}
|
||||
|
||||
func prepareAntigravityGeminiReasoningReplayPayload(ctx context.Context, modelName string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, payload []byte) ([]byte, antigravityReasoningReplayScope, error) {
|
||||
if !antigravityUsesReasoningReplayCache(modelName) {
|
||||
return payload, antigravityReasoningReplayScope{}, nil
|
||||
}
|
||||
return applyAntigravityReasoningReplayCache(ctx, modelName, req, opts, payload)
|
||||
}
|
||||
|
||||
func clearAntigravityReasoningReplayOnInvalidSignature(ctx context.Context, scope antigravityReasoningReplayScope, statusCode int, body []byte) error {
|
||||
if !scope.valid() {
|
||||
return nil
|
||||
}
|
||||
if statusCode != http.StatusBadRequest {
|
||||
return nil
|
||||
}
|
||||
bodyText := strings.ToLower(string(body))
|
||||
if !strings.Contains(bodyText, "thoughtsignature") && !strings.Contains(bodyText, "thought_signature") && !strings.Contains(bodyText, "signature") {
|
||||
return nil
|
||||
}
|
||||
return internalcache.DeleteAntigravityReasoningReplayItemRequired(ctx, scope.modelName, scope.sessionKey)
|
||||
}
|
||||
|
||||
func applyAntigravityReasoningReplayCache(ctx context.Context, modelName string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, payload []byte) ([]byte, antigravityReasoningReplayScope, error) {
|
||||
scope := antigravityReasoningReplayScopeFromRequest(ctx, modelName, req, opts, payload)
|
||||
if !scope.valid() {
|
||||
return payload, scope, nil
|
||||
}
|
||||
items, ok, err := internalcache.GetAntigravityReasoningReplayItemsRequired(ctx, scope.modelName, scope.sessionKey)
|
||||
if err != nil || !ok || len(items) == 0 {
|
||||
return payload, scope, err
|
||||
}
|
||||
items = filterAntigravityReasoningReplayItemsForRequest(payload, items)
|
||||
if len(items) == 0 {
|
||||
return payload, scope, nil
|
||||
}
|
||||
updated, okApply := insertAntigravityReasoningReplayItems(payload, items)
|
||||
if !okApply {
|
||||
return payload, scope, nil
|
||||
}
|
||||
return updated, scope, nil
|
||||
}
|
||||
|
||||
func filterAntigravityReasoningReplayItemsForRequest(payload []byte, items [][]byte) [][]byte {
|
||||
existing := antigravityExistingToolCallKeys(payload)
|
||||
filtered := make([][]byte, 0, len(items))
|
||||
for _, item := range items {
|
||||
itemResult := gjson.ParseBytes(item)
|
||||
switch strings.TrimSpace(itemResult.Get("type").String()) {
|
||||
case "function_call_part":
|
||||
keys := antigravityReplayToolCallKeys(itemResult)
|
||||
if len(keys) == 0 {
|
||||
continue
|
||||
}
|
||||
if antigravityAnyKeyExists(existing, keys) {
|
||||
if !antigravityNeedsSignatureReplayForExistingFunctionCall(payload, itemResult) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if !antigravityRequestHasMatchingFunctionResponse(payload, itemResult) {
|
||||
continue
|
||||
}
|
||||
case "thought_signature":
|
||||
if antigravityRequestHasThoughtSignatureAt(payload, itemResult) {
|
||||
continue
|
||||
}
|
||||
default:
|
||||
continue
|
||||
}
|
||||
filtered = append(filtered, item)
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
func antigravityExistingToolCallKeys(payload []byte) map[string]bool {
|
||||
existing := make(map[string]bool)
|
||||
contents := gjson.GetBytes(payload, "request.contents")
|
||||
if !contents.IsArray() {
|
||||
return existing
|
||||
}
|
||||
for _, content := range contents.Array() {
|
||||
parts := content.Get("parts")
|
||||
if !parts.IsArray() {
|
||||
continue
|
||||
}
|
||||
for _, part := range parts.Array() {
|
||||
if fc := part.Get("functionCall"); fc.Exists() {
|
||||
for _, key := range antigravityReplayToolCallKeysFromPart(fc) {
|
||||
existing[key] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return existing
|
||||
}
|
||||
|
||||
func antigravityReplayToolCallKeys(itemResult gjson.Result) []string {
|
||||
callID := strings.TrimSpace(itemResult.Get("call_id").String())
|
||||
if callID == "" {
|
||||
callID = strings.TrimSpace(itemResult.Get("id").String())
|
||||
}
|
||||
name := strings.TrimSpace(itemResult.Get("name").String())
|
||||
if name == "" {
|
||||
return nil
|
||||
}
|
||||
args := itemResult.Get("args").Raw
|
||||
key := antigravityFunctionCallKey(name, args, callID)
|
||||
if key == "" {
|
||||
return nil
|
||||
}
|
||||
return []string{key}
|
||||
}
|
||||
|
||||
func antigravityReplayToolCallKeysFromPart(fc gjson.Result) []string {
|
||||
return antigravityReplayToolCallKeys(gjson.Parse(fc.Raw))
|
||||
}
|
||||
|
||||
func antigravityFunctionCallKey(name, argsRaw, callID string) string {
|
||||
name = strings.TrimSpace(name)
|
||||
if name == "" {
|
||||
return ""
|
||||
}
|
||||
h := sha256.Sum256([]byte(strings.Join([]string{name, argsRaw, callID}, "\x00")))
|
||||
return fmt.Sprintf("fc:%x", h[:8])
|
||||
}
|
||||
|
||||
func antigravityAnyKeyExists(existing map[string]bool, keys []string) bool {
|
||||
for _, key := range keys {
|
||||
if existing[key] {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func antigravityNeedsSignatureReplayForExistingFunctionCall(payload []byte, itemResult gjson.Result) bool {
|
||||
callID := strings.TrimSpace(itemResult.Get("call_id").String())
|
||||
if callID == "" {
|
||||
callID = strings.TrimSpace(itemResult.Get("id").String())
|
||||
}
|
||||
sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String())
|
||||
if callID == "" || sig == "" {
|
||||
return false
|
||||
}
|
||||
ci, pi, ok := antigravityFunctionCallPartLocation(payload, callID)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
pathSig := fmt.Sprintf("request.contents.%d.parts.%d.thoughtSignature", ci, pi)
|
||||
return strings.TrimSpace(gjson.GetBytes(payload, pathSig).String()) == ""
|
||||
}
|
||||
|
||||
func antigravityRequestHasMatchingFunctionResponse(payload []byte, itemResult gjson.Result) bool {
|
||||
callID := strings.TrimSpace(itemResult.Get("call_id").String())
|
||||
if callID == "" {
|
||||
return true
|
||||
}
|
||||
_, ok := antigravityFunctionResponseContentIndex(payload, callID)
|
||||
return ok
|
||||
}
|
||||
|
||||
func antigravityFunctionResponseContentIndex(payload []byte, callID string) (int, bool) {
|
||||
callID = strings.TrimSpace(callID)
|
||||
if callID == "" {
|
||||
return -1, false
|
||||
}
|
||||
contents := gjson.GetBytes(payload, "request.contents")
|
||||
if !contents.IsArray() {
|
||||
return -1, false
|
||||
}
|
||||
for i, content := range contents.Array() {
|
||||
parts := content.Get("parts")
|
||||
if !parts.IsArray() {
|
||||
continue
|
||||
}
|
||||
for _, part := range parts.Array() {
|
||||
fr := part.Get("functionResponse")
|
||||
if fr.Exists() && strings.TrimSpace(fr.Get("id").String()) == callID {
|
||||
return i, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return -1, false
|
||||
}
|
||||
|
||||
func antigravityPayloadHasFunctionCallID(payload []byte, callID string) bool {
|
||||
_, _, ok := antigravityFunctionCallPartLocation(payload, callID)
|
||||
return ok
|
||||
}
|
||||
|
||||
func antigravityFunctionCallPartLocation(payload []byte, callID string) (contentIndex int, partIndex int, ok bool) {
|
||||
callID = strings.TrimSpace(callID)
|
||||
if callID == "" {
|
||||
return -1, -1, false
|
||||
}
|
||||
contents := gjson.GetBytes(payload, "request.contents")
|
||||
if !contents.IsArray() {
|
||||
return -1, -1, false
|
||||
}
|
||||
for ci, content := range contents.Array() {
|
||||
parts := content.Get("parts")
|
||||
if !parts.IsArray() {
|
||||
continue
|
||||
}
|
||||
for pi, part := range parts.Array() {
|
||||
fc := part.Get("functionCall")
|
||||
if fc.Exists() && strings.TrimSpace(fc.Get("id").String()) == callID {
|
||||
return ci, pi, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return -1, -1, false
|
||||
}
|
||||
|
||||
func insertAntigravityModelFunctionCallBeforeContent(payload []byte, beforeIndex int, name, callID, thoughtSig string, args gjson.Result) ([]byte, bool) {
|
||||
contents := gjson.GetBytes(payload, "request.contents")
|
||||
if !contents.IsArray() {
|
||||
return payload, false
|
||||
}
|
||||
arr := contents.Array()
|
||||
if beforeIndex < 0 || beforeIndex > len(arr) {
|
||||
return payload, false
|
||||
}
|
||||
fc := map[string]any{"name": name}
|
||||
if callID != "" {
|
||||
fc["id"] = callID
|
||||
}
|
||||
if args.Exists() {
|
||||
fc["args"] = args.Value()
|
||||
}
|
||||
part := map[string]any{"functionCall": fc}
|
||||
if thoughtSig != "" {
|
||||
part["thoughtSignature"] = thoughtSig
|
||||
}
|
||||
newContent := map[string]any{
|
||||
"role": "model",
|
||||
"parts": []any{part},
|
||||
}
|
||||
newArr := make([]any, 0, len(arr)+1)
|
||||
for i := 0; i < beforeIndex; i++ {
|
||||
newArr = append(newArr, arr[i].Value())
|
||||
}
|
||||
newArr = append(newArr, newContent)
|
||||
for i := beforeIndex; i < len(arr); i++ {
|
||||
newArr = append(newArr, arr[i].Value())
|
||||
}
|
||||
updated, err := sjson.SetBytes(payload, "request.contents", newArr)
|
||||
if err != nil {
|
||||
return payload, false
|
||||
}
|
||||
return updated, true
|
||||
}
|
||||
|
||||
func antigravityRequestHasThoughtSignatureAt(payload []byte, itemResult gjson.Result) bool {
|
||||
ci := int(itemResult.Get("contentIndex").Int())
|
||||
pi := int(itemResult.Get("partIndex").Int())
|
||||
path := fmt.Sprintf("request.contents.%d.parts.%d.thoughtSignature", ci, pi)
|
||||
return strings.TrimSpace(gjson.GetBytes(payload, path).String()) != ""
|
||||
}
|
||||
|
||||
func insertAntigravityReasoningReplayItems(payload []byte, items [][]byte) ([]byte, bool) {
|
||||
out := payload
|
||||
changed := false
|
||||
for _, item := range items {
|
||||
itemResult := gjson.ParseBytes(item)
|
||||
switch strings.TrimSpace(itemResult.Get("type").String()) {
|
||||
case "thought_signature":
|
||||
ci := antigravityReasoningReplayResolveContentIndex(out, int(itemResult.Get("contentIndex").Int()))
|
||||
pi := int(itemResult.Get("partIndex").Int())
|
||||
sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String())
|
||||
if sig == "" {
|
||||
continue
|
||||
}
|
||||
path := fmt.Sprintf("request.contents.%d.parts.%d.thoughtSignature", ci, pi)
|
||||
if strings.TrimSpace(gjson.GetBytes(out, path).String()) != "" {
|
||||
continue
|
||||
}
|
||||
updated, err := sjson.SetBytes(out, path, sig)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
out = updated
|
||||
changed = true
|
||||
case "function_call_part":
|
||||
updated, ok := mergeAntigravityFunctionCallPartReplay(out, itemResult)
|
||||
if ok {
|
||||
out = updated
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
}
|
||||
return out, changed
|
||||
}
|
||||
|
||||
func mergeAntigravityFunctionCallPartReplay(payload []byte, itemResult gjson.Result) ([]byte, bool) {
|
||||
name := strings.TrimSpace(itemResult.Get("name").String())
|
||||
args := itemResult.Get("args")
|
||||
callID := strings.TrimSpace(itemResult.Get("call_id").String())
|
||||
sig := strings.TrimSpace(itemResult.Get("thoughtSignature").String())
|
||||
if name == "" || !args.Exists() {
|
||||
return payload, false
|
||||
}
|
||||
if callID != "" {
|
||||
if ci, pi, exists := antigravityFunctionCallPartLocation(payload, callID); exists {
|
||||
if sig != "" {
|
||||
pathSig := fmt.Sprintf("request.contents.%d.parts.%d.thoughtSignature", ci, pi)
|
||||
if strings.TrimSpace(gjson.GetBytes(payload, pathSig).String()) == "" {
|
||||
if updated, err := sjson.SetBytes(payload, pathSig, sig); err == nil {
|
||||
return updated, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return payload, false
|
||||
}
|
||||
if frIndex, ok := antigravityFunctionResponseContentIndex(payload, callID); ok {
|
||||
return insertAntigravityModelFunctionCallBeforeContent(payload, frIndex, name, callID, sig, args)
|
||||
}
|
||||
}
|
||||
|
||||
ci := antigravityReasoningReplayResolveContentIndex(payload, int(itemResult.Get("contentIndex").Int()))
|
||||
pi := int(itemResult.Get("partIndex").Int())
|
||||
pathSig := fmt.Sprintf("request.contents.%d.parts.%d.thoughtSignature", ci, pi)
|
||||
out := payload
|
||||
changed := false
|
||||
if sig != "" && strings.TrimSpace(gjson.GetBytes(out, pathSig).String()) == "" {
|
||||
if updated, err := sjson.SetBytes(out, pathSig, sig); err == nil {
|
||||
out = updated
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
pathFC := fmt.Sprintf("request.contents.%d.parts.%d.functionCall", ci, pi)
|
||||
if !gjson.GetBytes(out, pathFC).Exists() {
|
||||
fc := map[string]any{"name": name}
|
||||
if callID != "" {
|
||||
fc["id"] = callID
|
||||
}
|
||||
if args.Type == gjson.String {
|
||||
fc["args"] = args.String()
|
||||
} else {
|
||||
var parsed any
|
||||
if json.Unmarshal([]byte(args.Raw), &parsed) == nil {
|
||||
fc["args"] = parsed
|
||||
}
|
||||
}
|
||||
if updated, err := sjson.SetBytes(out, pathFC, fc); err == nil {
|
||||
out = updated
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
return out, changed
|
||||
}
|
||||
|
||||
type antigravityReasoningReplayAccumulator struct {
|
||||
scope antigravityReasoningReplayScope
|
||||
requestPayload []byte
|
||||
items [][]byte
|
||||
seenFC map[string]bool
|
||||
contentIndex int
|
||||
nextPartIndex int
|
||||
}
|
||||
|
||||
func newAntigravityReasoningReplayAccumulator(scope antigravityReasoningReplayScope, requestPayload []byte) *antigravityReasoningReplayAccumulator {
|
||||
if !scope.valid() {
|
||||
return nil
|
||||
}
|
||||
contentIndex, basePartIndex := antigravityReasoningReplayPendingModelContentIndex(requestPayload)
|
||||
return &antigravityReasoningReplayAccumulator{
|
||||
scope: scope,
|
||||
requestPayload: append([]byte(nil), requestPayload...),
|
||||
seenFC: make(map[string]bool),
|
||||
contentIndex: contentIndex,
|
||||
nextPartIndex: basePartIndex,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *antigravityReasoningReplayAccumulator) ObserveSSELine(line []byte) {
|
||||
if a == nil {
|
||||
return
|
||||
}
|
||||
payload := helps.JSONPayload(line)
|
||||
if payload == nil {
|
||||
return
|
||||
}
|
||||
a.observeResponsePayload(payload)
|
||||
}
|
||||
|
||||
func (a *antigravityReasoningReplayAccumulator) observeResponsePayload(payload []byte) {
|
||||
parts := gjson.GetBytes(payload, "response.candidates.0.content.parts")
|
||||
if !parts.IsArray() {
|
||||
return
|
||||
}
|
||||
parts.ForEach(func(_, part gjson.Result) bool {
|
||||
pi := a.nextPartIndex
|
||||
a.nextPartIndex++
|
||||
sig := antigravityNativePartThoughtSignature(part)
|
||||
if fc := part.Get("functionCall"); fc.Exists() {
|
||||
keys := antigravityReplayToolCallKeysFromPart(fc)
|
||||
for _, k := range keys {
|
||||
if a.seenFC[k] {
|
||||
return true
|
||||
}
|
||||
}
|
||||
for _, k := range keys {
|
||||
a.seenFC[k] = true
|
||||
}
|
||||
item := buildAntigravityFunctionCallPartItem(a.contentIndex, pi, fc, sig)
|
||||
if len(item) > 0 {
|
||||
a.items = append(a.items, item)
|
||||
}
|
||||
return true
|
||||
}
|
||||
if sig != "" {
|
||||
item := buildAntigravityThoughtSignatureItem(a.contentIndex, pi, sig)
|
||||
a.items = append(a.items, item)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func buildAntigravityThoughtSignatureItem(contentIndex, partIndex int, signature string) []byte {
|
||||
return []byte(fmt.Sprintf(`{"type":"thought_signature","thoughtSignature":%q,"contentIndex":%d,"partIndex":%d}`,
|
||||
signature, contentIndex, partIndex))
|
||||
}
|
||||
|
||||
func buildAntigravityFunctionCallPartItem(contentIndex, partIndex int, fc gjson.Result, signature string) []byte {
|
||||
item := map[string]any{
|
||||
"type": "function_call_part",
|
||||
"contentIndex": contentIndex,
|
||||
"partIndex": partIndex,
|
||||
"name": fc.Get("name").String(),
|
||||
}
|
||||
if id := strings.TrimSpace(fc.Get("id").String()); id != "" {
|
||||
item["call_id"] = id
|
||||
}
|
||||
if args := fc.Get("args"); args.Exists() {
|
||||
if args.Type == gjson.String {
|
||||
item["args"] = args.String()
|
||||
} else {
|
||||
item["args"] = json.RawMessage(args.Raw)
|
||||
}
|
||||
}
|
||||
if signature != "" {
|
||||
item["thoughtSignature"] = signature
|
||||
}
|
||||
raw, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return raw
|
||||
}
|
||||
|
||||
func (a *antigravityReasoningReplayAccumulator) Flush(ctx context.Context) {
|
||||
if a == nil || !a.scope.valid() || len(a.items) == 0 {
|
||||
return
|
||||
}
|
||||
if !internalcache.CacheAntigravityReasoningReplayItemsBestEffort(ctx, a.scope.modelName, a.scope.sessionKey, a.items) {
|
||||
_ = internalcache.DeleteAntigravityReasoningReplayItemRequired(ctx, a.scope.modelName, a.scope.sessionKey)
|
||||
}
|
||||
}
|
||||
|
||||
func cacheAntigravityReasoningReplayFromResponse(ctx context.Context, scope antigravityReasoningReplayScope, requestPayload, body []byte) {
|
||||
if !scope.valid() || len(body) == 0 {
|
||||
return
|
||||
}
|
||||
acc := newAntigravityReasoningReplayAccumulator(scope, requestPayload)
|
||||
acc.observeResponsePayload(body)
|
||||
acc.Flush(ctx)
|
||||
}
|
||||
|
||||
func applyAntigravityNativeSignatureReplayIfNeeded(modelName string, payload []byte) []byte {
|
||||
if antigravityUsesReasoningReplayCache(modelName) {
|
||||
return payload
|
||||
}
|
||||
// Native per-part signature replay is not on upstream/dev; Gemini uses HOME replay only.
|
||||
return payload
|
||||
}
|
||||
|
||||
func antigravityUsesReasoningReplayCache(modelName string) bool {
|
||||
modelName = strings.ToLower(modelName)
|
||||
if strings.Contains(modelName, "claude") {
|
||||
return false
|
||||
}
|
||||
return strings.Contains(modelName, "gemini") || strings.Contains(modelName, "flash") || strings.Contains(modelName, "agent")
|
||||
}
|
||||
|
||||
func antigravityNativePartThoughtSignature(part gjson.Result) string {
|
||||
for _, path := range []string{"thoughtSignature", "thought_signature", "extra_content.google.thought_signature"} {
|
||||
if signature := strings.TrimSpace(part.Get(path).String()); signature != "" {
|
||||
return signature
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
internalcache "github.com/router-for-me/CLIProxyAPI/v7/internal/cache"
|
||||
"github.com/router-for-me/CLIProxyAPI/v7/internal/config"
|
||||
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth"
|
||||
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/executor"
|
||||
sdktranslator "github.com/router-for-me/CLIProxyAPI/v7/sdk/translator"
|
||||
)
|
||||
|
||||
func TestAntigravityReasoningReplayClearsOnInvalidSignature400(t *testing.T) {
|
||||
internalcache.ClearAntigravityReasoningReplayCache()
|
||||
t.Cleanup(internalcache.ClearAntigravityReasoningReplayCache)
|
||||
|
||||
model := "gemini-3-flash-agent"
|
||||
sessionKey := "session:pr3900-invalid-sig"
|
||||
bad := []byte(`{"type":"thought_signature","thoughtSignature":"INVALID_REPLAY_SIGNATURE_PR3900_XXXXXXXXX","contentIndex":1,"partIndex":0}`)
|
||||
if !internalcache.CacheAntigravityReasoningReplayItems(model, sessionKey, [][]byte{bad}) {
|
||||
t.Fatal("failed to seed replay cache")
|
||||
}
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = io.ReadAll(r.Body)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
_, _ = w.Write([]byte(`{"error":{"message":"Invalid thoughtSignature in model content","code":400}}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
exec := NewAntigravityExecutor(&config.Config{RequestRetry: 1})
|
||||
auth := &cliproxyauth.Auth{
|
||||
ID: "auth-pr3900-invalid-sig",
|
||||
Attributes: map[string]string{
|
||||
"base_url": server.URL,
|
||||
},
|
||||
Metadata: map[string]any{
|
||||
"access_token": "token",
|
||||
"project_id": "project-1",
|
||||
"expired": time.Now().Add(1 * time.Hour).Format(time.RFC3339),
|
||||
},
|
||||
}
|
||||
|
||||
payload := []byte(`{"sessionId":"pr3900-invalid-sig","request":{"contents":[{"role":"user","parts":[{"text":"hi"}]},{"role":"user","parts":[{"functionResponse":{"id":"id1","name":"Bash","response":{"result":"ok"}}}]}]}}`)
|
||||
_, err := exec.Execute(context.Background(), auth, cliproxyexecutor.Request{
|
||||
Model: model,
|
||||
Payload: payload,
|
||||
}, cliproxyexecutor.Options{
|
||||
SourceFormat: sdktranslator.FormatAntigravity,
|
||||
Stream: false,
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatal("expected upstream 400 error")
|
||||
}
|
||||
if _, ok, errGet := internalcache.GetAntigravityReasoningReplayItemsRequired(context.Background(), model, sessionKey); errGet != nil {
|
||||
t.Fatalf("get after clear: %v", errGet)
|
||||
} else if ok {
|
||||
t.Fatal("invalid signature 400 should clear cached replay item")
|
||||
}
|
||||
}
|
||||
146
internal/runtime/executor/antigravity_reasoning_replay_test.go
Normal file
146
internal/runtime/executor/antigravity_reasoning_replay_test.go
Normal file
@@ -0,0 +1,146 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
internalcache "github.com/router-for-me/CLIProxyAPI/v7/internal/cache"
|
||||
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/executor"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
func TestAntigravityReasoningReplayAccumulatorMultiToolSSEChunks(t *testing.T) {
|
||||
internalcache.ClearAntigravityReasoningReplayCache()
|
||||
t.Cleanup(internalcache.ClearAntigravityReasoningReplayCache)
|
||||
|
||||
requestPayload := []byte(`{"sessionId":"sess-1","request":{"contents":[{"role":"user","parts":[{"text":"hi"}]}]}}`)
|
||||
scope := antigravityReasoningReplayScope{modelName: "gemini-3-flash-agent", sessionKey: "session:sess-1"}
|
||||
acc := newAntigravityReasoningReplayAccumulator(scope, requestPayload)
|
||||
if acc == nil {
|
||||
t.Fatal("accumulator is nil")
|
||||
}
|
||||
if acc.contentIndex != 1 || acc.nextPartIndex != 0 {
|
||||
t.Fatalf("pending model slot = %d/%d, want 1/0", acc.contentIndex, acc.nextPartIndex)
|
||||
}
|
||||
|
||||
line1 := []byte(`data: {"response":{"candidates":[{"content":{"parts":[{"thoughtSignature":"sig-first","functionCall":{"name":"Read","args":{"file_path":"/a"},"id":"id1"}}]}}]}}`)
|
||||
line2 := []byte(`data: {"response":{"candidates":[{"content":{"parts":[{"functionCall":{"name":"Read","args":{"file_path":"/b"},"id":"id2"}}]}}]}}`)
|
||||
acc.ObserveSSELine(line1)
|
||||
acc.ObserveSSELine(line2)
|
||||
acc.Flush(context.Background())
|
||||
|
||||
items, ok := internalcache.GetAntigravityReasoningReplayItems("gemini-3-flash-agent", "session:sess-1")
|
||||
if !ok || len(items) != 2 {
|
||||
t.Fatalf("cached items = %v ok=%v, want 2 items", len(items), ok)
|
||||
}
|
||||
pi0 := int(gjson.GetBytes(items[0], "partIndex").Int())
|
||||
pi1 := int(gjson.GetBytes(items[1], "partIndex").Int())
|
||||
if pi0 != 0 || pi1 != 1 {
|
||||
t.Fatalf("partIndex = %d,%d, want 0,1", pi0, pi1)
|
||||
}
|
||||
if got := gjson.GetBytes(items[0], "thoughtSignature").String(); got != "sig-first" {
|
||||
t.Fatalf("first sig = %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrepareAntigravityGeminiReasoningReplayPayloadInjectsCachedToolPart(t *testing.T) {
|
||||
internalcache.ClearAntigravityReasoningReplayCache()
|
||||
t.Cleanup(internalcache.ClearAntigravityReasoningReplayCache)
|
||||
|
||||
item := []byte(`{"type":"function_call_part","contentIndex":1,"partIndex":0,"name":"Read","call_id":"id1","args":{"file_path":"/a"},"thoughtSignature":"sig-first"}`)
|
||||
if !internalcache.CacheAntigravityReasoningReplayItems("gemini-3-flash-agent", "session:sess-2", [][]byte{item}) {
|
||||
t.Fatal("cache write failed")
|
||||
}
|
||||
|
||||
req := cliproxyexecutor.Request{}
|
||||
opts := cliproxyexecutor.Options{}
|
||||
payload := []byte(`{"sessionId":"sess-2","request":{"contents":[{"role":"user","parts":[{"text":"hi"}]},{"role":"user","parts":[{"functionResponse":{"id":"id1","name":"Read","response":{"result":"ok"}}}]}]}}`)
|
||||
out, scope, err := prepareAntigravityGeminiReasoningReplayPayload(context.Background(), "gemini-3-flash-agent", req, opts, payload)
|
||||
if err != nil {
|
||||
t.Fatalf("prepare error: %v", err)
|
||||
}
|
||||
if !scope.valid() {
|
||||
t.Fatal("scope invalid")
|
||||
}
|
||||
if gjson.GetBytes(out, "request.contents.1.role").String() != "model" {
|
||||
t.Fatalf("functionCall replay must be model role at [1], got %s", string(out))
|
||||
}
|
||||
if got := gjson.GetBytes(out, "request.contents.1.parts.0.thoughtSignature").String(); got != "sig-first" {
|
||||
t.Fatalf("thoughtSignature = %q, want sig-first", got)
|
||||
}
|
||||
if !gjson.GetBytes(out, "request.contents.1.parts.0.functionCall").Exists() {
|
||||
t.Fatalf("functionCall not injected: %s", string(out))
|
||||
}
|
||||
if !gjson.GetBytes(out, "request.contents.2.parts.0.functionResponse").Exists() {
|
||||
t.Fatalf("functionResponse should follow model functionCall at [2]: %s", string(out))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrepareAntigravityGeminiReasoningReplayInsertsBeforeModelFunctionResponse(t *testing.T) {
|
||||
internalcache.ClearAntigravityReasoningReplayCache()
|
||||
t.Cleanup(internalcache.ClearAntigravityReasoningReplayCache)
|
||||
|
||||
item := []byte(`{"type":"function_call_part","contentIndex":1,"partIndex":0,"name":"Read","call_id":"id1","args":{"file_path":"/a"},"thoughtSignature":"sig-first"}`)
|
||||
internalcache.CacheAntigravityReasoningReplayItems("gemini-3-flash-agent", "session:sess-3", [][]byte{item})
|
||||
|
||||
payload := []byte(`{"sessionId":"sess-3","request":{"contents":[{"role":"user","parts":[{"text":"hi"}]},{"role":"model","parts":[{"functionResponse":{"id":"id1","name":"Read","response":{"result":"ok"}}}]}]}}`)
|
||||
out, _, err := prepareAntigravityGeminiReasoningReplayPayload(context.Background(), "gemini-3-flash-agent", cliproxyexecutor.Request{}, cliproxyexecutor.Options{}, payload)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !gjson.GetBytes(out, "request.contents.1.parts.0.functionCall").Exists() || gjson.GetBytes(out, "request.contents.1.role").String() != "model" {
|
||||
t.Fatalf("want model functionCall at [1]: %s", string(out))
|
||||
}
|
||||
if !gjson.GetBytes(out, "request.contents.2.parts.0.functionResponse").Exists() {
|
||||
t.Fatalf("functionResponse should be at [2]: %s", string(out))
|
||||
}
|
||||
}
|
||||
|
||||
func TestMergeAntigravityFunctionCallPartReplayMergesSignatureIntoExistingFunctionCall(t *testing.T) {
|
||||
internalcache.ClearAntigravityReasoningReplayCache()
|
||||
t.Cleanup(internalcache.ClearAntigravityReasoningReplayCache)
|
||||
|
||||
item := []byte(`{"type":"function_call_part","contentIndex":1,"partIndex":0,"name":"Read","call_id":"id1","args":{"file_path":"/a"},"thoughtSignature":"sig-first"}`)
|
||||
internalcache.CacheAntigravityReasoningReplayItems("gemini-3-flash-agent", "session:sess-merge", [][]byte{item})
|
||||
|
||||
payload := []byte(`{"sessionId":"sess-merge","request":{"contents":[{"role":"user","parts":[{"text":"hi"}]},{"role":"model","parts":[{"functionCall":{"id":"id1","name":"Read","args":{"file_path":"/a"}}}]},{"role":"user","parts":[{"functionResponse":{"id":"id1","name":"Read","response":{"result":"ok"}}}]}]}}`)
|
||||
out, _, err := prepareAntigravityGeminiReasoningReplayPayload(context.Background(), "gemini-3-flash-agent", cliproxyexecutor.Request{}, cliproxyexecutor.Options{}, payload)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got := gjson.GetBytes(out, "request.contents.1.parts.0.thoughtSignature").String(); got != "sig-first" {
|
||||
t.Fatalf("thoughtSignature = %q, want sig-first; body=%s", got, out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAntigravityReasoningReplayScopeUsesStableSessionWithoutSessionId(t *testing.T) {
|
||||
payload := []byte(`{"request":{"contents":[{"role":"user","parts":[{"text":"stable-user-text"}]}]}}`)
|
||||
scope := antigravityReasoningReplayScopeFromPayload("gemini-3-flash-agent", payload)
|
||||
if !scope.valid() {
|
||||
t.Fatal("scope should be valid from stable session hash")
|
||||
}
|
||||
if !strings.HasPrefix(scope.sessionKey, "session:") {
|
||||
t.Fatalf("sessionKey = %q", scope.sessionKey)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAntigravityReplayToolCallKeysUsesNativeFunctionCallID(t *testing.T) {
|
||||
fc := gjson.Parse(`{"name":"Read","args":{"file_path":"/a"},"id":"id-native"}`)
|
||||
keys := antigravityReplayToolCallKeysFromPart(fc)
|
||||
if len(keys) != 1 {
|
||||
t.Fatalf("keys = %v", keys)
|
||||
}
|
||||
fc2 := gjson.Parse(`{"name":"Read","args":{"file_path":"/a"},"id":"id-native-2"}`)
|
||||
keys2 := antigravityReplayToolCallKeysFromPart(fc2)
|
||||
if keys[0] == keys2[0] {
|
||||
t.Fatalf("parallel tool calls should not share replay key: %v vs %v", keys, keys2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAntigravityRequestHasMatchingFunctionResponseWhitespaceCallID(t *testing.T) {
|
||||
item := gjson.Parse(`{"call_id":" "}`)
|
||||
if !antigravityRequestHasMatchingFunctionResponse(nil, item) {
|
||||
t.Fatal("whitespace-only call_id should be treated as empty => true")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user