feat(cursor): conversation checkpoint + session_id for multi-turn context

- Capture conversation_checkpoint_update from Cursor server (was ignored)
- Store checkpoint per conversationId, replay as conversation_state on next request
- Use protowire to embed raw checkpoint bytes directly (no deserialization)
- Extract session_id from Claude Code metadata for stable conversationId across resume
- Flatten conversation history into userText as fallback when no checkpoint available
- Use conversationId as session key for reliable tool call resume
- Add checkpoint TTL cleanup (30min)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
黄姜恒
2026-03-26 10:51:47 +08:00
parent 9613f0b3f9
commit c95620f90e
4 changed files with 341 additions and 40 deletions

1
.gitignore vendored
View File

@@ -1,6 +1,7 @@
# Binaries
cli-proxy-api
cliproxy
/server
*.exe

View File

@@ -35,6 +35,7 @@ const (
ServerMsgTurnEnded // Turn has ended (no more output)
ServerMsgHeartbeat // Server heartbeat
ServerMsgTokenDelta // Token usage delta
ServerMsgCheckpoint // Conversation checkpoint update
)
// DecodedServerMessage holds parsed data from an AgentServerMessage.
@@ -69,6 +70,9 @@ type DecodedServerMessage struct {
// For TokenDeltaUpdate
TokenDelta int64
// For conversation checkpoint update (raw bytes, not decoded)
CheckpointData []byte
}
// DecodeAgentServerMessage parses an AgentServerMessage and returns
@@ -104,8 +108,9 @@ func DecodeAgentServerMessage(data []byte) (*DecodedServerMessage, error) {
case ASM_KvServerMessage:
decodeKvServerMessage(val, msg)
case ASM_ConversationCheckpoint:
// Ignore checkpoint updates
log.Debugf("DecodeAgentServerMessage: ignoring ConversationCheckpoint")
msg.Type = ServerMsgCheckpoint
msg.CheckpointData = append([]byte(nil), val...) // copy raw bytes
log.Debugf("DecodeAgentServerMessage: captured checkpoint %d bytes", len(val))
}
case protowire.VarintType:

View File

@@ -10,6 +10,7 @@ import (
"fmt"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/dynamicpb"
@@ -29,6 +30,7 @@ type RunRequestParams struct {
Turns []TurnData
McpTools []McpToolDef
BlobStore map[string][]byte // hex(sha256) -> data, populated during encoding
RawCheckpoint []byte // if non-nil, use as conversation_state directly (from server checkpoint)
}
type ImageData struct {
@@ -102,7 +104,13 @@ func EncodeHeartbeat() []byte {
// EncodeRunRequest builds a full AgentClientMessage wrapping an AgentRunRequest.
// Mirrors buildCursorRequest() in cursor-fetch.ts.
// If p.RawCheckpoint is set, it is used directly as the conversation_state bytes
// (from a previous conversation_checkpoint_update), skipping manual turn construction.
func EncodeRunRequest(p *RunRequestParams) []byte {
if p.RawCheckpoint != nil {
return encodeRunRequestWithCheckpoint(p)
}
if p.BlobStore == nil {
p.BlobStore = make(map[string][]byte)
}
@@ -153,12 +161,19 @@ func EncodeRunRequest(p *RunRequestParams) []byte {
rootField := field(css, "root_prompt_messages_json")
rootList := css.Mutable(rootField).List()
rootList.Append(protoreflect.ValueOfBytes(blobId))
// turns: repeated bytes
// turns: repeated bytes (field 8) + turns_old (field 2) for compatibility
turnsField := field(css, "turns")
turnsList := css.Mutable(turnsField).List()
for _, tb := range turnBytes {
turnsList.Append(protoreflect.ValueOfBytes(tb))
}
turnsOldField := field(css, "turns_old")
if turnsOldField != nil {
turnsOldList := css.Mutable(turnsOldField).List()
for _, tb := range turnBytes {
turnsOldList.Append(protoreflect.ValueOfBytes(tb))
}
}
// --- UserMessage (current) ---
userMessage := newMsg("UserMessage")
@@ -227,6 +242,164 @@ func EncodeRunRequest(p *RunRequestParams) []byte {
return marshal(acm)
}
// encodeRunRequestWithCheckpoint builds an AgentClientMessage using a raw checkpoint
// as conversation_state. The checkpoint bytes are embedded directly without deserialization.
func encodeRunRequestWithCheckpoint(p *RunRequestParams) []byte {
// Build UserMessage
userMessage := newMsg("UserMessage")
setStr(userMessage, "text", p.UserText)
setStr(userMessage, "message_id", p.MessageId)
if len(p.Images) > 0 {
sc := newMsg("SelectedContext")
imgsField := field(sc, "selected_images")
imgsList := sc.Mutable(imgsField).List()
for _, img := range p.Images {
si := newMsg("SelectedImage")
setStr(si, "uuid", generateId())
setStr(si, "mime_type", img.MimeType)
setBytes(si, "data", img.Data)
imgsList.Append(protoreflect.ValueOfMessage(si.ProtoReflect()))
}
setMsg(userMessage, "selected_context", sc)
}
// Build ConversationAction with UserMessageAction
uma := newMsg("UserMessageAction")
setMsg(uma, "user_message", userMessage)
ca := newMsg("ConversationAction")
setMsg(ca, "user_message_action", uma)
caBytes := marshal(ca)
// Build ModelDetails
md := newMsg("ModelDetails")
setStr(md, "model_id", p.ModelId)
setStr(md, "display_model_id", p.ModelId)
setStr(md, "display_name", p.ModelId)
mdBytes := marshal(md)
// Build McpTools
var mcpToolsBytes []byte
if len(p.McpTools) > 0 {
mcpTools := newMsg("McpTools")
toolsField := field(mcpTools, "mcp_tools")
toolsList := mcpTools.Mutable(toolsField).List()
for _, tool := range p.McpTools {
td := newMsg("McpToolDefinition")
setStr(td, "name", tool.Name)
setStr(td, "description", tool.Description)
if len(tool.InputSchema) > 0 {
setBytes(td, "input_schema", jsonToProtobufValueBytes(tool.InputSchema))
}
setStr(td, "provider_identifier", "proxy")
setStr(td, "tool_name", tool.Name)
toolsList.Append(protoreflect.ValueOfMessage(td.ProtoReflect()))
}
mcpToolsBytes = marshal(mcpTools)
}
// Manually assemble AgentRunRequest using protowire to embed raw checkpoint
var arrBuf []byte
// field 1: conversation_state = raw checkpoint bytes (length-delimited)
arrBuf = protowire.AppendTag(arrBuf, ARR_ConversationState, protowire.BytesType)
arrBuf = protowire.AppendBytes(arrBuf, p.RawCheckpoint)
// field 2: action = ConversationAction
arrBuf = protowire.AppendTag(arrBuf, ARR_Action, protowire.BytesType)
arrBuf = protowire.AppendBytes(arrBuf, caBytes)
// field 3: model_details = ModelDetails
arrBuf = protowire.AppendTag(arrBuf, ARR_ModelDetails, protowire.BytesType)
arrBuf = protowire.AppendBytes(arrBuf, mdBytes)
// field 4: mcp_tools = McpTools
if len(mcpToolsBytes) > 0 {
arrBuf = protowire.AppendTag(arrBuf, ARR_McpTools, protowire.BytesType)
arrBuf = protowire.AppendBytes(arrBuf, mcpToolsBytes)
}
// field 5: conversation_id = string
if p.ConversationId != "" {
arrBuf = protowire.AppendTag(arrBuf, ARR_ConversationId, protowire.BytesType)
arrBuf = protowire.AppendString(arrBuf, p.ConversationId)
}
// Wrap in AgentClientMessage field 1 (run_request)
var acmBuf []byte
acmBuf = protowire.AppendTag(acmBuf, ACM_RunRequest, protowire.BytesType)
acmBuf = protowire.AppendBytes(acmBuf, arrBuf)
log.Debugf("cursor encode: built RunRequest with checkpoint (%d bytes), total=%d bytes", len(p.RawCheckpoint), len(acmBuf))
return acmBuf
}
// ResumeRequestParams holds data for a ResumeAction request.
type ResumeRequestParams struct {
ModelId string
ConversationId string
McpTools []McpToolDef
}
// EncodeResumeRequest builds an AgentClientMessage with ResumeAction.
// Used to resume a conversation by conversation_id without re-sending full history.
func EncodeResumeRequest(p *ResumeRequestParams) []byte {
// RequestContext with tools
rc := newMsg("RequestContext")
if len(p.McpTools) > 0 {
toolsField := field(rc, "tools")
toolsList := rc.Mutable(toolsField).List()
for _, tool := range p.McpTools {
td := newMsg("McpToolDefinition")
setStr(td, "name", tool.Name)
setStr(td, "description", tool.Description)
if len(tool.InputSchema) > 0 {
setBytes(td, "input_schema", jsonToProtobufValueBytes(tool.InputSchema))
}
setStr(td, "provider_identifier", "proxy")
setStr(td, "tool_name", tool.Name)
toolsList.Append(protoreflect.ValueOfMessage(td.ProtoReflect()))
}
}
// ResumeAction
ra := newMsg("ResumeAction")
setMsg(ra, "request_context", rc)
// ConversationAction with resume_action
ca := newMsg("ConversationAction")
setMsg(ca, "resume_action", ra)
// ModelDetails
md := newMsg("ModelDetails")
setStr(md, "model_id", p.ModelId)
setStr(md, "display_model_id", p.ModelId)
setStr(md, "display_name", p.ModelId)
// AgentRunRequest — no conversation_state needed for resume
arr := newMsg("AgentRunRequest")
setMsg(arr, "action", ca)
setMsg(arr, "model_details", md)
setStr(arr, "conversation_id", p.ConversationId)
// McpTools at top level
if len(p.McpTools) > 0 {
mcpTools := newMsg("McpTools")
toolsField := field(mcpTools, "mcp_tools")
toolsList := mcpTools.Mutable(toolsField).List()
for _, tool := range p.McpTools {
td := newMsg("McpToolDefinition")
setStr(td, "name", tool.Name)
setStr(td, "description", tool.Description)
if len(tool.InputSchema) > 0 {
setBytes(td, "input_schema", jsonToProtobufValueBytes(tool.InputSchema))
}
setStr(td, "provider_identifier", "proxy")
setStr(td, "tool_name", tool.Name)
toolsList.Append(protoreflect.ValueOfMessage(td.ProtoReflect()))
}
setMsg(arr, "mcp_tools", mcpTools)
}
acm := newMsg("AgentClientMessage")
setMsg(acm, "run_request", arr)
return marshal(acm)
}
// --- KV response encoders ---
// Mirrors handleKvMessage() in cursor-fetch.ts

View File

@@ -35,14 +35,23 @@ const (
cursorClientVersion = "cli-2026.02.13-41ac335"
cursorAuthType = "cursor"
cursorHeartbeatInterval = 5 * time.Second
cursorSessionTTL = 5 * time.Minute
cursorSessionTTL = 5 * time.Minute
cursorCheckpointTTL = 30 * time.Minute
)
// CursorExecutor handles requests to the Cursor API via Connect+Protobuf protocol.
type CursorExecutor struct {
cfg *config.Config
mu sync.Mutex
sessions map[string]*cursorSession
cfg *config.Config
mu sync.Mutex
sessions map[string]*cursorSession
checkpoints map[string]*savedCheckpoint // keyed by conversationId
}
// savedCheckpoint stores the server's conversation_checkpoint_update for reuse.
type savedCheckpoint struct {
data []byte // raw ConversationStateStructure protobuf bytes
blobStore map[string][]byte // blobs referenced by the checkpoint
updatedAt time.Time
}
type cursorSession struct {
@@ -68,8 +77,9 @@ type pendingMcpExec struct {
// NewCursorExecutor constructs a new executor instance.
func NewCursorExecutor(cfg *config.Config) *CursorExecutor {
e := &CursorExecutor{
cfg: cfg,
sessions: make(map[string]*cursorSession),
cfg: cfg,
sessions: make(map[string]*cursorSession),
checkpoints: make(map[string]*savedCheckpoint),
}
go e.cleanupLoop()
return e
@@ -106,6 +116,11 @@ func (e *CursorExecutor) cleanupLoop() {
delete(e.sessions, k)
}
}
for k, cp := range e.checkpoints {
if time.Since(cp.updatedAt) > cursorCheckpointTTL {
delete(e.checkpoints, k)
}
}
e.mu.Unlock()
}
}
@@ -209,8 +224,8 @@ func (e *CursorExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
}
parsed := parseOpenAIRequest(payload)
cch := extractCCH(parsed.SystemPrompt)
conversationId := deriveConversationId(apiKeyFromContext(ctx), cch)
ccSessId := extractClaudeCodeSessionId(req.Payload)
conversationId := deriveConversationId(apiKeyFromContext(ctx), ccSessId, parsed.SystemPrompt)
params := buildRunRequestParams(parsed, conversationId)
requestBytes := cursorproto.EncodeRunRequest(params)
@@ -241,6 +256,7 @@ func (e *CursorExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
nil,
nil,
nil, // tokenUsage - non-streaming
nil, // onCheckpoint - non-streaming doesn't persist
)
id := "chatcmpl-" + uuid.New().String()[:28]
@@ -279,6 +295,12 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
return nil, fmt.Errorf("cursor: access token not found")
}
// Extract session_id from metadata BEFORE translation (translation strips metadata)
ccSessionId := extractClaudeCodeSessionId(req.Payload)
if ccSessionId == "" && len(opts.OriginalRequest) > 0 {
ccSessionId = extractClaudeCodeSessionId(opts.OriginalRequest)
}
// Translate input to OpenAI format if needed
from := opts.SourceFormat
to := sdktranslator.FromString("openai")
@@ -297,11 +319,11 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
log.Debugf("cursor: parsed request: model=%s userText=%d chars, turns=%d, tools=%d, toolResults=%d",
parsed.Model, len(parsed.UserText), len(parsed.Turns), len(parsed.Tools), len(parsed.ToolResults))
cch := extractCCH(parsed.SystemPrompt)
conversationId := deriveConversationId(apiKeyFromContext(ctx), cch)
log.Debugf("cursor: cch=%s conversationId=%s", cch, conversationId)
conversationId := deriveConversationId(apiKeyFromContext(ctx), ccSessionId, parsed.SystemPrompt)
log.Debugf("cursor: conversationId=%s ccSessionId=%s", conversationId, ccSessionId)
sessionKey := deriveSessionKey(apiKeyFromContext(ctx), parsed.Model, parsed.Messages)
// Use conversationId as session key — stable across requests in the same Claude Code session
sessionKey := conversationId
needsTranslate := from.String() != "" && from.String() != "openai"
// Check if we can resume an existing session with tool results
@@ -327,14 +349,33 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
}
e.mu.Unlock()
// If tool results exist but no session to resume, bake them into turns
// so the model sees tool interaction context in the new conversation.
if len(parsed.ToolResults) > 0 {
log.Debugf("cursor: no session to resume, baking %d tool results into turns", len(parsed.ToolResults))
bakeToolResultsIntoTurns(parsed)
}
// Look up saved checkpoint for this conversation
e.mu.Lock()
saved, hasCheckpoint := e.checkpoints[conversationId]
e.mu.Unlock()
params := buildRunRequestParams(parsed, conversationId)
if hasCheckpoint && saved.data != nil {
log.Debugf("cursor: using saved checkpoint (%d bytes) for conversationId=%s", len(saved.data), conversationId)
params.RawCheckpoint = saved.data
// Merge saved blobStore into params
if params.BlobStore == nil {
params.BlobStore = make(map[string][]byte)
}
for k, v := range saved.blobStore {
if _, exists := params.BlobStore[k]; !exists {
params.BlobStore[k] = v
}
}
} else if len(parsed.ToolResults) > 0 || len(parsed.Turns) > 0 {
// Fallback: no checkpoint available (cold resume / proxy restart).
// Flatten the full conversation history (including tool interactions) into userText.
// Cursor's turns encoding is not reliably read by the model, but userText always works.
log.Debugf("cursor: no checkpoint, flattening %d turns + %d tool results into userText", len(parsed.Turns), len(parsed.ToolResults))
flattenConversationIntoUserText(parsed)
params = buildRunRequestParams(parsed, conversationId)
}
requestBytes := cursorproto.EncodeRunRequest(params)
framedRequest := cursorproto.FrameConnectMessage(requestBytes, 0)
@@ -488,6 +529,17 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
},
toolResultCh,
usage,
func(cpData []byte) {
// Save checkpoint for this conversation
e.mu.Lock()
e.checkpoints[conversationId] = &savedCheckpoint{
data: cpData,
blobStore: params.BlobStore,
updatedAt: time.Now(),
}
e.mu.Unlock()
log.Debugf("cursor: saved checkpoint (%d bytes) for conversationId=%s", len(cpData), conversationId)
},
)
// processH2SessionFrames returned — stream is done
@@ -641,6 +693,7 @@ func processH2SessionFrames(
onMcpExec func(exec pendingMcpExec),
toolResultCh <-chan []toolResultInfo, // nil for no tool result injection; non-nil to wait for results
tokenUsage *cursorTokenUsage, // tracks accumulated token usage (may be nil)
onCheckpoint func(data []byte), // called when server sends conversation_checkpoint_update
) {
var buf bytes.Buffer
rejectReason := "Tool not available in this environment. Use the MCP tools provided instead."
@@ -711,6 +764,12 @@ func processH2SessionFrames(
// Server heartbeat, ignore silently
continue
case cursorproto.ServerMsgCheckpoint:
if onCheckpoint != nil && len(msg.CheckpointData) > 0 {
onCheckpoint(msg.CheckpointData)
}
continue
case cursorproto.ServerMsgTokenDelta:
if tokenUsage != nil && msg.TokenDelta > 0 {
tokenUsage.addOutput(msg.TokenDelta)
@@ -802,6 +861,10 @@ func processH2SessionFrames(
stream.Write(cursorproto.FrameConnectMessage(cursorproto.EncodeKvSetBlobResult(wmsg.KvId), 0))
case cursorproto.ServerMsgExecRequestCtx:
stream.Write(cursorproto.FrameConnectMessage(cursorproto.EncodeExecRequestContextResult(wmsg.ExecMsgId, wmsg.ExecId, mcpTools), 0))
case cursorproto.ServerMsgCheckpoint:
if onCheckpoint != nil && len(wmsg.CheckpointData) > 0 {
onCheckpoint(wmsg.CheckpointData)
}
}
}
case <-stream.Done():
@@ -948,22 +1011,56 @@ func parseOpenAIRequest(payload []byte) *parsedOpenAIRequest {
// bakeToolResultsIntoTurns merges tool results into the last turn's assistant text
// when there's no active H2 session to resume. This ensures the model sees the
// full tool interaction context in a new conversation.
func bakeToolResultsIntoTurns(parsed *parsedOpenAIRequest) {
if len(parsed.ToolResults) == 0 || len(parsed.Turns) == 0 {
return
// flattenConversationIntoUserText flattens the full conversation history
// (turns + tool results) into the UserText field as plain text.
// This is the fallback for cold resume when no checkpoint is available.
// Cursor reliably reads UserText but ignores structured turns.
func flattenConversationIntoUserText(parsed *parsedOpenAIRequest) {
var buf strings.Builder
// Flatten turns into readable context
for _, turn := range parsed.Turns {
if turn.UserText != "" {
buf.WriteString("USER: ")
buf.WriteString(turn.UserText)
buf.WriteString("\n\n")
}
if turn.AssistantText != "" {
buf.WriteString("ASSISTANT: ")
buf.WriteString(turn.AssistantText)
buf.WriteString("\n\n")
}
}
last := &parsed.Turns[len(parsed.Turns)-1]
var toolContext strings.Builder
// Flatten tool results
for _, tr := range parsed.ToolResults {
toolContext.WriteString("\n\n[Tool Result]\n")
toolContext.WriteString(tr.Content)
buf.WriteString("TOOL_RESULT (call_id: ")
buf.WriteString(tr.ToolCallId)
buf.WriteString("): ")
// Truncate very large tool results to avoid overwhelming the context
content := tr.Content
if len(content) > 8000 {
content = content[:8000] + "\n... [truncated]"
}
buf.WriteString(content)
buf.WriteString("\n\n")
}
if last.AssistantText != "" {
last.AssistantText += toolContext.String()
if buf.Len() > 0 {
buf.WriteString("The above is the previous conversation context including tool call results.\n")
buf.WriteString("Continue your response based on this context.\n\n")
}
// Prepend flattened history to the current UserText
if parsed.UserText != "" {
parsed.UserText = buf.String() + "Current request: " + parsed.UserText
} else {
last.AssistantText = toolContext.String()
parsed.UserText = buf.String() + "Continue from the conversation above."
}
parsed.ToolResults = nil // consumed
// Clear turns and tool results since they're now in UserText
parsed.Turns = nil
parsed.ToolResults = nil
}
func extractTextContent(content gjson.Result) string {
@@ -1096,8 +1193,6 @@ func newH2Client() *http.Client {
}
// extractCCH extracts the cch value from the system prompt's billing header.
// Format: x-anthropic-billing-header: cc_version=...; cc_entrypoint=cli; cch=XXXXX;
// The cch is unique per Claude Code session and stable across requests in the same session.
func extractCCH(systemPrompt string) string {
idx := strings.Index(systemPrompt, "cch=")
if idx < 0 {
@@ -1111,13 +1206,40 @@ func extractCCH(systemPrompt string) string {
return rest[:end]
}
// deriveConversationId generates a deterministic conversation_id from the client API key and cch.
// Same Claude Code session → same cch → same conversation_id → Cursor server can reuse context.
func deriveConversationId(apiKey, cch string) string {
if cch == "" {
return uuid.New().String()
// extractClaudeCodeSessionId extracts session_id from Claude Code's metadata.user_id JSON.
// Format: {"metadata":{"user_id":"{\"session_id\":\"xxx\",\"device_id\":\"yyy\"}"}}
func extractClaudeCodeSessionId(payload []byte) string {
userIdStr := gjson.GetBytes(payload, "metadata.user_id").String()
if userIdStr == "" {
return ""
}
h := sha256.Sum256([]byte("cursor-conv:" + apiKey + ":" + cch))
// user_id is a JSON string that needs to be parsed again
sid := gjson.Get(userIdStr, "session_id").String()
return sid
}
// deriveConversationId generates a deterministic conversation_id.
// Priority: session_id (stable across resume) > system prompt hash (fallback).
func deriveConversationId(apiKey, sessionId, systemPrompt string) string {
var input string
if sessionId != "" {
// Best: use Claude Code's session_id — stable even across resume
input = "cursor-conv:" + apiKey + ":" + sessionId
} else {
// Fallback: use system prompt content minus volatile cch
stable := systemPrompt
if idx := strings.Index(stable, "cch="); idx >= 0 {
end := strings.IndexAny(stable[idx:], "; \n")
if end > 0 {
stable = stable[:idx] + stable[idx+end:]
}
}
if len(stable) > 500 {
stable = stable[:500]
}
input = "cursor-conv:" + apiKey + ":" + stable
}
h := sha256.Sum256([]byte(input))
s := hex.EncodeToString(h[:16])
return fmt.Sprintf("%s-%s-%s-%s-%s", s[:8], s[8:12], s[12:16], s[16:20], s[20:32])
}