From ece889101ce85455f1b77af015b0d03a796d8bbb Mon Sep 17 00:00:00 2001 From: xiaoyun172 Date: Mon, 16 Mar 2026 21:58:09 +0800 Subject: [PATCH] Improve incremental text streaming output --- src/handler.ts | 259 ++++++++++++++++++++++++++++++++++++++++++ src/openai-handler.ts | 189 +++++++++++++++++++++++++++++- src/streaming-text.ts | 180 +++++++++++++++++++++++++++++ 3 files changed, 627 insertions(+), 1 deletion(-) create mode 100644 src/streaming-text.ts diff --git a/src/handler.ts b/src/handler.ts index 0f65126..a0a8c4e 100644 --- a/src/handler.ts +++ b/src/handler.ts @@ -19,6 +19,7 @@ import { convertToCursorRequest, parseToolCalls, hasToolCalls } from './converte import { sendCursorRequest, sendCursorRequestFull } from './cursor-client.js'; import { getConfig } from './config.js'; import { createRequestLogger, type RequestLogger } from './logger.js'; +import { createIncrementalTextStreamer, splitLeadingThinkingBlocks, stripThinkingTags } from './streaming-text.js'; function msgId(): string { return 'msg_' + uuidv4().replace(/-/g, '').substring(0, 24); @@ -628,6 +629,254 @@ export function buildRetryRequest(body: AnthropicRequest, attempt: number): Anth return { ...body, messages: newMessages }; } +function writeAnthropicTextDelta( + res: Response, + state: { blockIndex: number; textBlockStarted: boolean }, + text: string, +): void { + if (!text) return; + + if (!state.textBlockStarted) { + writeSSE(res, 'content_block_start', { + type: 'content_block_start', + index: state.blockIndex, + content_block: { type: 'text', text: '' }, + }); + state.textBlockStarted = true; + } + + writeSSE(res, 'content_block_delta', { + type: 'content_block_delta', + index: state.blockIndex, + delta: { type: 'text_delta', text }, + }); +} + +function emitAnthropicThinkingBlock( + res: Response, + state: { blockIndex: number; textBlockStarted: boolean; thinkingEmitted: boolean }, + thinkingContent: string, +): void { + if (!thinkingContent || state.thinkingEmitted) return; + + writeSSE(res, 'content_block_start', { + type: 'content_block_start', + index: state.blockIndex, + content_block: { type: 'thinking', thinking: '' }, + }); + writeSSE(res, 'content_block_delta', { + type: 'content_block_delta', + index: state.blockIndex, + delta: { type: 'thinking_delta', thinking: thinkingContent }, + }); + writeSSE(res, 'content_block_stop', { + type: 'content_block_stop', + index: state.blockIndex, + }); + + state.blockIndex++; + state.thinkingEmitted = true; +} + +async function handleDirectTextStream( + res: Response, + cursorReq: CursorChatRequest, + body: AnthropicRequest, + log: RequestLogger, + clientRequestedThinking: boolean, + streamState: { blockIndex: number; textBlockStarted: boolean; thinkingEmitted: boolean }, +): Promise { + let activeCursorReq = cursorReq; + let retryCount = 0; + let finalRawResponse = ''; + let finalVisibleText = ''; + let finalThinkingContent = ''; + let streamer = createIncrementalTextStreamer({ + transform: sanitizeResponse, + isBlockedPrefix: (text) => isRefusal(text.substring(0, 300)), + }); + + const executeAttempt = async (): Promise<{ + rawResponse: string; + visibleText: string; + thinkingContent: string; + streamer: ReturnType; + }> => { + let rawResponse = ''; + let visibleText = ''; + let leadingBuffer = ''; + let leadingResolved = !clientRequestedThinking; + let thinkingContent = ''; + const attemptStreamer = createIncrementalTextStreamer({ + transform: sanitizeResponse, + isBlockedPrefix: (text) => isRefusal(text.substring(0, 300)), + }); + + const flushVisible = (chunk: string): void => { + if (!chunk) return; + visibleText += chunk; + const delta = attemptStreamer.push(chunk); + if (!delta) return; + + if (clientRequestedThinking && thinkingContent && !streamState.thinkingEmitted) { + emitAnthropicThinkingBlock(res, streamState, thinkingContent); + } + writeAnthropicTextDelta(res, streamState, delta); + }; + + const apiStart = Date.now(); + let firstChunk = true; + log.startPhase('send', '发送到 Cursor'); + + await sendCursorRequest(activeCursorReq, (event: CursorSSEEvent) => { + if (event.type !== 'text-delta' || !event.delta) return; + + if (firstChunk) { + log.recordTTFT(); + log.endPhase(); + log.startPhase('response', '接收响应'); + firstChunk = false; + } + + rawResponse += event.delta; + + if (!clientRequestedThinking) { + flushVisible(event.delta); + return; + } + + if (!leadingResolved) { + leadingBuffer += event.delta; + const split = splitLeadingThinkingBlocks(leadingBuffer); + + if (split.startedWithThinking) { + if (!split.complete) return; + thinkingContent = split.thinkingContent; + leadingResolved = true; + leadingBuffer = ''; + flushVisible(split.remainder); + return; + } + + leadingResolved = true; + const buffered = leadingBuffer; + leadingBuffer = ''; + flushVisible(buffered); + return; + } + + flushVisible(event.delta); + }); + + if (firstChunk) { + log.endPhase(); + } else { + log.endPhase(); + } + + log.recordCursorApiTime(apiStart); + + return { + rawResponse, + visibleText: clientRequestedThinking ? visibleText : rawResponse, + thinkingContent, + streamer: attemptStreamer, + }; + }; + + while (true) { + const attempt = await executeAttempt(); + finalRawResponse = attempt.rawResponse; + finalVisibleText = attempt.visibleText; + finalThinkingContent = attempt.thinkingContent; + streamer = attempt.streamer; + + const textForRefusalCheck = clientRequestedThinking + ? finalVisibleText + : stripThinkingTags(finalRawResponse); + + if (!streamer.hasSentText() && isRefusal(textForRefusalCheck) && retryCount < MAX_REFUSAL_RETRIES) { + retryCount++; + log.warn('Handler', 'retry', `检测到拒绝(第${retryCount}次),自动重试`, { + preview: textForRefusalCheck.substring(0, 200), + }); + log.updateSummary({ retryCount }); + const retryBody = buildRetryRequest(body, retryCount - 1); + activeCursorReq = await convertToCursorRequest(retryBody); + continue; + } + + break; + } + + log.recordRawResponse(finalRawResponse); + log.info('Handler', 'response', `原始响应: ${finalRawResponse.length} chars`, { + preview: finalRawResponse.substring(0, 300), + hasTools: false, + }); + + if (!finalThinkingContent && finalRawResponse.includes('')) { + const thinkingMatch = finalRawResponse.match(/([\s\S]*?)<\/thinking>/g); + if (thinkingMatch) { + finalThinkingContent = thinkingMatch.map(m => m.replace(/<\/?thinking>/g, '').trim()).join('\n\n'); + } + } + + if (finalThinkingContent) { + log.recordThinking(finalThinkingContent); + log.updateSummary({ thinkingChars: finalThinkingContent.length }); + if (clientRequestedThinking) { + log.info('Handler', 'thinking', `剥离 thinking → content block: ${finalThinkingContent.length} chars, 剩余 ${finalVisibleText.length} chars`); + } else { + log.info('Handler', 'thinking', `保留 thinking 在正文中 (非客户端请求): ${finalThinkingContent.length} chars`); + } + } + + let finalTextToSend: string; + const refusalText = clientRequestedThinking ? finalVisibleText : stripThinkingTags(finalRawResponse); + const usedFallback = !streamer.hasSentText() && isRefusal(refusalText); + if (usedFallback) { + if (isToolCapabilityQuestion(body)) { + log.info('Handler', 'refusal', '工具能力询问被拒绝 → 返回 Claude 能力描述'); + finalTextToSend = CLAUDE_TOOLS_RESPONSE; + } else { + log.warn('Handler', 'refusal', `重试${MAX_REFUSAL_RETRIES}次后仍被拒绝 → 降级为 Claude 身份回复`); + finalTextToSend = CLAUDE_IDENTITY_RESPONSE; + } + } else { + finalTextToSend = streamer.finish(); + } + + if (!usedFallback && clientRequestedThinking && finalThinkingContent && !streamState.thinkingEmitted) { + emitAnthropicThinkingBlock(res, streamState, finalThinkingContent); + } + + writeAnthropicTextDelta(res, streamState, finalTextToSend); + + if (streamState.textBlockStarted) { + writeSSE(res, 'content_block_stop', { + type: 'content_block_stop', + index: streamState.blockIndex, + }); + streamState.blockIndex++; + } + + writeSSE(res, 'message_delta', { + type: 'message_delta', + delta: { stop_reason: 'end_turn', stop_sequence: null }, + usage: { output_tokens: Math.ceil((streamer.hasSentText() ? (finalVisibleText || finalRawResponse) : finalTextToSend).length / 4) }, + }); + writeSSE(res, 'message_stop', { type: 'message_stop' }); + + const finalRecordedResponse = streamer.hasSentText() + ? sanitizeResponse(clientRequestedThinking ? finalVisibleText : finalRawResponse) + : finalTextToSend; + log.recordFinalResponse(finalRecordedResponse); + log.complete(finalRecordedResponse.length, 'end_turn'); + + res.end(); +} + // ==================== 流式处理 ==================== async function handleStream(res: Response, cursorReq: CursorChatRequest, body: AnthropicRequest, log: RequestLogger, clientRequestedThinking: boolean = false): Promise { @@ -666,6 +915,7 @@ async function handleStream(res: Response, cursorReq: CursorChatRequest, body: A let sentText = ''; let blockIndex = 0; let textBlockStarted = false; + let thinkingBlockEmitted = false; // 无工具模式:先缓冲全部响应再检测拒绝,如果是拒绝则重试 let activeCursorReq = cursorReq; @@ -686,6 +936,15 @@ async function handleStream(res: Response, cursorReq: CursorChatRequest, body: A }; try { + if (!hasTools) { + await handleDirectTextStream(res, cursorReq, body, log, clientRequestedThinking, { + blockIndex, + textBlockStarted, + thinkingEmitted: thinkingBlockEmitted, + }); + return; + } + await executeStream(); log.recordRawResponse(fullResponse); diff --git a/src/openai-handler.ts b/src/openai-handler.ts index 1a0e008..efcd4e0 100644 --- a/src/openai-handler.ts +++ b/src/openai-handler.ts @@ -28,6 +28,7 @@ import { convertToCursorRequest, parseToolCalls, hasToolCalls } from './converte import { sendCursorRequest, sendCursorRequestFull } from './cursor-client.js'; import { getConfig } from './config.js'; import { createRequestLogger } from './logger.js'; +import { createIncrementalTextStreamer, splitLeadingThinkingBlocks, stripThinkingTags } from './streaming-text.js'; import { isRefusal, sanitizeResponse, @@ -375,6 +376,187 @@ function handleOpenAIMockNonStream(res: Response, body: OpenAIChatRequest, mockT }); } +function writeOpenAITextDelta( + res: Response, + id: string, + created: number, + model: string, + text: string, +): void { + if (!text) return; + writeOpenAISSE(res, { + id, + object: 'chat.completion.chunk', + created, + model, + choices: [{ + index: 0, + delta: { content: text }, + finish_reason: null, + }], + }); +} + +function writeOpenAIReasoningDelta( + res: Response, + id: string, + created: number, + model: string, + reasoningContent: string, +): void { + if (!reasoningContent) return; + writeOpenAISSE(res, { + id, + object: 'chat.completion.chunk', + created, + model, + choices: [{ + index: 0, + delta: { reasoning_content: reasoningContent } as Record, + finish_reason: null, + }], + }); +} + +async function handleOpenAIIncrementalTextStream( + res: Response, + cursorReq: CursorChatRequest, + body: OpenAIChatRequest, + anthropicReq: AnthropicRequest, + streamMeta: { id: string; created: number; model: string }, +): Promise { + let activeCursorReq = cursorReq; + let retryCount = 0; + const thinkingEnabled = anthropicReq.thinking?.type === 'enabled'; + let finalRawResponse = ''; + let finalVisibleText = ''; + let finalReasoningContent = ''; + let streamer = createIncrementalTextStreamer({ + transform: sanitizeResponse, + isBlockedPrefix: (text) => isRefusal(text.substring(0, 300)), + }); + let reasoningSent = false; + + const executeAttempt = async (): Promise<{ + rawResponse: string; + visibleText: string; + reasoningContent: string; + streamer: ReturnType; + }> => { + let rawResponse = ''; + let visibleText = ''; + let leadingBuffer = ''; + let leadingResolved = false; + let reasoningContent = ''; + const attemptStreamer = createIncrementalTextStreamer({ + transform: sanitizeResponse, + isBlockedPrefix: (text) => isRefusal(text.substring(0, 300)), + }); + + const flushVisible = (chunk: string): void => { + if (!chunk) return; + visibleText += chunk; + const delta = attemptStreamer.push(chunk); + if (!delta) return; + + if (thinkingEnabled && reasoningContent && !reasoningSent) { + writeOpenAIReasoningDelta(res, streamMeta.id, streamMeta.created, streamMeta.model, reasoningContent); + reasoningSent = true; + } + writeOpenAITextDelta(res, streamMeta.id, streamMeta.created, streamMeta.model, delta); + }; + + await sendCursorRequest(activeCursorReq, (event: CursorSSEEvent) => { + if (event.type !== 'text-delta' || !event.delta) return; + + rawResponse += event.delta; + + if (!leadingResolved) { + leadingBuffer += event.delta; + const split = splitLeadingThinkingBlocks(leadingBuffer); + + if (split.startedWithThinking) { + if (!split.complete) return; + reasoningContent = split.thinkingContent; + leadingResolved = true; + leadingBuffer = ''; + flushVisible(split.remainder); + return; + } + + leadingResolved = true; + const buffered = leadingBuffer; + leadingBuffer = ''; + flushVisible(buffered); + return; + } + + flushVisible(event.delta); + }); + + return { + rawResponse, + visibleText, + reasoningContent, + streamer: attemptStreamer, + }; + }; + + while (true) { + const attempt = await executeAttempt(); + finalRawResponse = attempt.rawResponse; + finalVisibleText = attempt.visibleText; + finalReasoningContent = attempt.reasoningContent; + streamer = attempt.streamer; + + const textForRefusalCheck = finalVisibleText; + + if (!streamer.hasSentText() && isRefusal(textForRefusalCheck) && retryCount < MAX_REFUSAL_RETRIES) { + retryCount++; + const retryBody = buildRetryRequest(anthropicReq, retryCount - 1); + activeCursorReq = await convertToCursorRequest(retryBody); + reasoningSent = false; + continue; + } + + break; + } + + const refusalText = finalVisibleText; + const usedFallback = !streamer.hasSentText() && isRefusal(refusalText); + + let finalTextToSend: string; + if (usedFallback) { + finalTextToSend = isToolCapabilityQuestion(anthropicReq) + ? CLAUDE_TOOLS_RESPONSE + : CLAUDE_IDENTITY_RESPONSE; + } else { + finalTextToSend = streamer.finish(); + } + + if (!usedFallback && thinkingEnabled && finalReasoningContent && !reasoningSent) { + writeOpenAIReasoningDelta(res, streamMeta.id, streamMeta.created, streamMeta.model, finalReasoningContent); + reasoningSent = true; + } + + writeOpenAITextDelta(res, streamMeta.id, streamMeta.created, streamMeta.model, finalTextToSend); + + writeOpenAISSE(res, { + id: streamMeta.id, + object: 'chat.completion.chunk', + created: streamMeta.created, + model: streamMeta.model, + choices: [{ + index: 0, + delta: {}, + finish_reason: 'stop', + }], + }); + + res.write('data: [DONE]\n\n'); + res.end(); +} + // ==================== 流式处理(OpenAI SSE 格式) ==================== async function handleOpenAIStream( @@ -420,6 +602,11 @@ async function handleOpenAIStream( }; try { + if (!hasTools && (!body.response_format || body.response_format.type === 'text')) { + await handleOpenAIIncrementalTextStream(res, cursorReq, body, anthropicReq, { id, created, model }); + return; + } + await executeStream(); // 日志记录在详细日志中 (Web UI 可见) @@ -433,7 +620,7 @@ async function handleOpenAIStream( if (thinkingEnabled) { reasoningContent = thinkingMatch.map(m => m.replace(/<\/?thinking>/g, '').trim()).join('\n\n'); } - fullResponse = fullResponse.replace(/[\s\S]*?<\/thinking>\s*/g, '').trim(); + fullResponse = stripThinkingTags(fullResponse); // thinking 剥离记录在详细日志中 } } diff --git a/src/streaming-text.ts b/src/streaming-text.ts new file mode 100644 index 0000000..683adc2 --- /dev/null +++ b/src/streaming-text.ts @@ -0,0 +1,180 @@ +/** + * streaming-text.ts - 流式文本增量释放辅助 + * + * 目标: + * 1. 为纯正文流提供更接近“打字效果”的增量输出 + * 2. 在真正开始向客户端输出前,先保留一小段预热文本,降低拒绝前缀泄漏概率 + * 3. 发送时保留尾部保护窗口,给跨 chunk 的清洗规则预留上下文 + */ + +export interface LeadingThinkingSplit { + startedWithThinking: boolean; + complete: boolean; + thinkingContent: string; + remainder: string; +} + +export interface IncrementalTextStreamerOptions { + warmupChars?: number; + guardChars?: number; + transform?: (text: string) => string; + isBlockedPrefix?: (text: string) => boolean; +} + +export interface IncrementalTextStreamer { + push(chunk: string): string; + finish(): string; + hasUnlocked(): boolean; + hasSentText(): boolean; + getRawText(): string; +} + +const THINKING_OPEN = ''; +const THINKING_CLOSE = ''; +const DEFAULT_WARMUP_CHARS = 96; +const DEFAULT_GUARD_CHARS = 256; +const STREAM_START_BOUNDARY_RE = /[\n。!?.!?]/; + +/** + * 剥离完整的 thinking 标签,返回可用于拒绝检测或最终文本处理的正文。 + */ +export function stripThinkingTags(text: string): string { + if (!text) return text; + return text.replace(/[\s\S]*?<\/thinking>\s*/g, '').trim(); +} + +/** + * 只解析“前导 thinking 块”。 + * + * Cursor 的 thinking 通常位于响应最前面,正文随后出现。 + * 这里仅处理前导块,避免把正文中的普通文本误判成 thinking 标签。 + */ +export function splitLeadingThinkingBlocks(text: string): LeadingThinkingSplit { + if (!text) { + return { + startedWithThinking: false, + complete: false, + thinkingContent: '', + remainder: '', + }; + } + + const trimmed = text.trimStart(); + if (!trimmed.startsWith(THINKING_OPEN)) { + return { + startedWithThinking: false, + complete: false, + thinkingContent: '', + remainder: text, + }; + } + + let cursor = trimmed; + const thinkingParts: string[] = []; + + while (cursor.startsWith(THINKING_OPEN)) { + const closeIndex = cursor.indexOf(THINKING_CLOSE, THINKING_OPEN.length); + if (closeIndex === -1) { + return { + startedWithThinking: true, + complete: false, + thinkingContent: '', + remainder: '', + }; + } + + const content = cursor.slice(THINKING_OPEN.length, closeIndex).trim(); + if (content) thinkingParts.push(content); + cursor = cursor.slice(closeIndex + THINKING_CLOSE.length).trimStart(); + } + + return { + startedWithThinking: true, + complete: true, + thinkingContent: thinkingParts.join('\n\n'), + remainder: cursor, + }; +} + +/** + * 创建增量文本释放器。 + * + * 释放策略: + * - 先缓冲一小段,确认不像拒绝前缀,再开始输出 + * - 输出时总是保留尾部 guardChars,不把“边界附近”的文本过早发出去 + * - 最终 finish() 时再把剩余文本一次性补齐 + */ +export function createIncrementalTextStreamer( + options: IncrementalTextStreamerOptions = {}, +): IncrementalTextStreamer { + const warmupChars = options.warmupChars ?? DEFAULT_WARMUP_CHARS; + const guardChars = options.guardChars ?? DEFAULT_GUARD_CHARS; + const transform = options.transform ?? ((text: string) => text); + const isBlockedPrefix = options.isBlockedPrefix ?? (() => false); + + let rawText = ''; + let sentText = ''; + let unlocked = false; + let sentAny = false; + + const tryUnlock = (): boolean => { + if (unlocked) return true; + + const preview = transform(rawText); + if (!preview.trim()) return false; + + const hasBoundary = STREAM_START_BOUNDARY_RE.test(preview); + const enoughChars = preview.length >= warmupChars; + if (!hasBoundary && !enoughChars) { + return false; + } + + if (isBlockedPrefix(preview.trim())) { + return false; + } + + unlocked = true; + return true; + }; + + const emitFromRawLength = (rawLength: number): string => { + const transformed = transform(rawText.slice(0, rawLength)); + if (transformed.length <= sentText.length) return ''; + + const delta = transformed.slice(sentText.length); + sentText = transformed; + if (delta) sentAny = true; + return delta; + }; + + return { + push(chunk: string): string { + if (!chunk) return ''; + + rawText += chunk; + if (!tryUnlock()) return ''; + + const safeRawLength = Math.max(0, rawText.length - guardChars); + if (safeRawLength <= 0) return ''; + + return emitFromRawLength(safeRawLength); + }, + + finish(): string { + if (!rawText) return ''; + return emitFromRawLength(rawText.length); + }, + + hasUnlocked(): boolean { + return unlocked; + }, + + hasSentText(): boolean { + return sentAny; + }, + + getRawText(): string { + return rawText; + }, + }; +}