Improve incremental text streaming output

This commit is contained in:
xiaoyun172
2026-03-16 21:58:09 +08:00
parent a7994ccbd5
commit ece889101c
3 changed files with 627 additions and 1 deletions

View File

@@ -19,6 +19,7 @@ import { convertToCursorRequest, parseToolCalls, hasToolCalls } from './converte
import { sendCursorRequest, sendCursorRequestFull } from './cursor-client.js';
import { getConfig } from './config.js';
import { createRequestLogger, type RequestLogger } from './logger.js';
import { createIncrementalTextStreamer, splitLeadingThinkingBlocks, stripThinkingTags } from './streaming-text.js';
function msgId(): string {
return 'msg_' + uuidv4().replace(/-/g, '').substring(0, 24);
@@ -628,6 +629,254 @@ export function buildRetryRequest(body: AnthropicRequest, attempt: number): Anth
return { ...body, messages: newMessages };
}
function writeAnthropicTextDelta(
res: Response,
state: { blockIndex: number; textBlockStarted: boolean },
text: string,
): void {
if (!text) return;
if (!state.textBlockStarted) {
writeSSE(res, 'content_block_start', {
type: 'content_block_start',
index: state.blockIndex,
content_block: { type: 'text', text: '' },
});
state.textBlockStarted = true;
}
writeSSE(res, 'content_block_delta', {
type: 'content_block_delta',
index: state.blockIndex,
delta: { type: 'text_delta', text },
});
}
function emitAnthropicThinkingBlock(
res: Response,
state: { blockIndex: number; textBlockStarted: boolean; thinkingEmitted: boolean },
thinkingContent: string,
): void {
if (!thinkingContent || state.thinkingEmitted) return;
writeSSE(res, 'content_block_start', {
type: 'content_block_start',
index: state.blockIndex,
content_block: { type: 'thinking', thinking: '' },
});
writeSSE(res, 'content_block_delta', {
type: 'content_block_delta',
index: state.blockIndex,
delta: { type: 'thinking_delta', thinking: thinkingContent },
});
writeSSE(res, 'content_block_stop', {
type: 'content_block_stop',
index: state.blockIndex,
});
state.blockIndex++;
state.thinkingEmitted = true;
}
async function handleDirectTextStream(
res: Response,
cursorReq: CursorChatRequest,
body: AnthropicRequest,
log: RequestLogger,
clientRequestedThinking: boolean,
streamState: { blockIndex: number; textBlockStarted: boolean; thinkingEmitted: boolean },
): Promise<void> {
let activeCursorReq = cursorReq;
let retryCount = 0;
let finalRawResponse = '';
let finalVisibleText = '';
let finalThinkingContent = '';
let streamer = createIncrementalTextStreamer({
transform: sanitizeResponse,
isBlockedPrefix: (text) => isRefusal(text.substring(0, 300)),
});
const executeAttempt = async (): Promise<{
rawResponse: string;
visibleText: string;
thinkingContent: string;
streamer: ReturnType<typeof createIncrementalTextStreamer>;
}> => {
let rawResponse = '';
let visibleText = '';
let leadingBuffer = '';
let leadingResolved = !clientRequestedThinking;
let thinkingContent = '';
const attemptStreamer = createIncrementalTextStreamer({
transform: sanitizeResponse,
isBlockedPrefix: (text) => isRefusal(text.substring(0, 300)),
});
const flushVisible = (chunk: string): void => {
if (!chunk) return;
visibleText += chunk;
const delta = attemptStreamer.push(chunk);
if (!delta) return;
if (clientRequestedThinking && thinkingContent && !streamState.thinkingEmitted) {
emitAnthropicThinkingBlock(res, streamState, thinkingContent);
}
writeAnthropicTextDelta(res, streamState, delta);
};
const apiStart = Date.now();
let firstChunk = true;
log.startPhase('send', '发送到 Cursor');
await sendCursorRequest(activeCursorReq, (event: CursorSSEEvent) => {
if (event.type !== 'text-delta' || !event.delta) return;
if (firstChunk) {
log.recordTTFT();
log.endPhase();
log.startPhase('response', '接收响应');
firstChunk = false;
}
rawResponse += event.delta;
if (!clientRequestedThinking) {
flushVisible(event.delta);
return;
}
if (!leadingResolved) {
leadingBuffer += event.delta;
const split = splitLeadingThinkingBlocks(leadingBuffer);
if (split.startedWithThinking) {
if (!split.complete) return;
thinkingContent = split.thinkingContent;
leadingResolved = true;
leadingBuffer = '';
flushVisible(split.remainder);
return;
}
leadingResolved = true;
const buffered = leadingBuffer;
leadingBuffer = '';
flushVisible(buffered);
return;
}
flushVisible(event.delta);
});
if (firstChunk) {
log.endPhase();
} else {
log.endPhase();
}
log.recordCursorApiTime(apiStart);
return {
rawResponse,
visibleText: clientRequestedThinking ? visibleText : rawResponse,
thinkingContent,
streamer: attemptStreamer,
};
};
while (true) {
const attempt = await executeAttempt();
finalRawResponse = attempt.rawResponse;
finalVisibleText = attempt.visibleText;
finalThinkingContent = attempt.thinkingContent;
streamer = attempt.streamer;
const textForRefusalCheck = clientRequestedThinking
? finalVisibleText
: stripThinkingTags(finalRawResponse);
if (!streamer.hasSentText() && isRefusal(textForRefusalCheck) && retryCount < MAX_REFUSAL_RETRIES) {
retryCount++;
log.warn('Handler', 'retry', `检测到拒绝(第${retryCount}次),自动重试`, {
preview: textForRefusalCheck.substring(0, 200),
});
log.updateSummary({ retryCount });
const retryBody = buildRetryRequest(body, retryCount - 1);
activeCursorReq = await convertToCursorRequest(retryBody);
continue;
}
break;
}
log.recordRawResponse(finalRawResponse);
log.info('Handler', 'response', `原始响应: ${finalRawResponse.length} chars`, {
preview: finalRawResponse.substring(0, 300),
hasTools: false,
});
if (!finalThinkingContent && finalRawResponse.includes('<thinking>')) {
const thinkingMatch = finalRawResponse.match(/<thinking>([\s\S]*?)<\/thinking>/g);
if (thinkingMatch) {
finalThinkingContent = thinkingMatch.map(m => m.replace(/<\/?thinking>/g, '').trim()).join('\n\n');
}
}
if (finalThinkingContent) {
log.recordThinking(finalThinkingContent);
log.updateSummary({ thinkingChars: finalThinkingContent.length });
if (clientRequestedThinking) {
log.info('Handler', 'thinking', `剥离 thinking → content block: ${finalThinkingContent.length} chars, 剩余 ${finalVisibleText.length} chars`);
} else {
log.info('Handler', 'thinking', `保留 thinking 在正文中 (非客户端请求): ${finalThinkingContent.length} chars`);
}
}
let finalTextToSend: string;
const refusalText = clientRequestedThinking ? finalVisibleText : stripThinkingTags(finalRawResponse);
const usedFallback = !streamer.hasSentText() && isRefusal(refusalText);
if (usedFallback) {
if (isToolCapabilityQuestion(body)) {
log.info('Handler', 'refusal', '工具能力询问被拒绝 → 返回 Claude 能力描述');
finalTextToSend = CLAUDE_TOOLS_RESPONSE;
} else {
log.warn('Handler', 'refusal', `重试${MAX_REFUSAL_RETRIES}次后仍被拒绝 → 降级为 Claude 身份回复`);
finalTextToSend = CLAUDE_IDENTITY_RESPONSE;
}
} else {
finalTextToSend = streamer.finish();
}
if (!usedFallback && clientRequestedThinking && finalThinkingContent && !streamState.thinkingEmitted) {
emitAnthropicThinkingBlock(res, streamState, finalThinkingContent);
}
writeAnthropicTextDelta(res, streamState, finalTextToSend);
if (streamState.textBlockStarted) {
writeSSE(res, 'content_block_stop', {
type: 'content_block_stop',
index: streamState.blockIndex,
});
streamState.blockIndex++;
}
writeSSE(res, 'message_delta', {
type: 'message_delta',
delta: { stop_reason: 'end_turn', stop_sequence: null },
usage: { output_tokens: Math.ceil((streamer.hasSentText() ? (finalVisibleText || finalRawResponse) : finalTextToSend).length / 4) },
});
writeSSE(res, 'message_stop', { type: 'message_stop' });
const finalRecordedResponse = streamer.hasSentText()
? sanitizeResponse(clientRequestedThinking ? finalVisibleText : finalRawResponse)
: finalTextToSend;
log.recordFinalResponse(finalRecordedResponse);
log.complete(finalRecordedResponse.length, 'end_turn');
res.end();
}
// ==================== 流式处理 ====================
async function handleStream(res: Response, cursorReq: CursorChatRequest, body: AnthropicRequest, log: RequestLogger, clientRequestedThinking: boolean = false): Promise<void> {
@@ -666,6 +915,7 @@ async function handleStream(res: Response, cursorReq: CursorChatRequest, body: A
let sentText = '';
let blockIndex = 0;
let textBlockStarted = false;
let thinkingBlockEmitted = false;
// 无工具模式:先缓冲全部响应再检测拒绝,如果是拒绝则重试
let activeCursorReq = cursorReq;
@@ -686,6 +936,15 @@ async function handleStream(res: Response, cursorReq: CursorChatRequest, body: A
};
try {
if (!hasTools) {
await handleDirectTextStream(res, cursorReq, body, log, clientRequestedThinking, {
blockIndex,
textBlockStarted,
thinkingEmitted: thinkingBlockEmitted,
});
return;
}
await executeStream();
log.recordRawResponse(fullResponse);

View File

@@ -28,6 +28,7 @@ import { convertToCursorRequest, parseToolCalls, hasToolCalls } from './converte
import { sendCursorRequest, sendCursorRequestFull } from './cursor-client.js';
import { getConfig } from './config.js';
import { createRequestLogger } from './logger.js';
import { createIncrementalTextStreamer, splitLeadingThinkingBlocks, stripThinkingTags } from './streaming-text.js';
import {
isRefusal,
sanitizeResponse,
@@ -375,6 +376,187 @@ function handleOpenAIMockNonStream(res: Response, body: OpenAIChatRequest, mockT
});
}
function writeOpenAITextDelta(
res: Response,
id: string,
created: number,
model: string,
text: string,
): void {
if (!text) return;
writeOpenAISSE(res, {
id,
object: 'chat.completion.chunk',
created,
model,
choices: [{
index: 0,
delta: { content: text },
finish_reason: null,
}],
});
}
function writeOpenAIReasoningDelta(
res: Response,
id: string,
created: number,
model: string,
reasoningContent: string,
): void {
if (!reasoningContent) return;
writeOpenAISSE(res, {
id,
object: 'chat.completion.chunk',
created,
model,
choices: [{
index: 0,
delta: { reasoning_content: reasoningContent } as Record<string, unknown>,
finish_reason: null,
}],
});
}
async function handleOpenAIIncrementalTextStream(
res: Response,
cursorReq: CursorChatRequest,
body: OpenAIChatRequest,
anthropicReq: AnthropicRequest,
streamMeta: { id: string; created: number; model: string },
): Promise<void> {
let activeCursorReq = cursorReq;
let retryCount = 0;
const thinkingEnabled = anthropicReq.thinking?.type === 'enabled';
let finalRawResponse = '';
let finalVisibleText = '';
let finalReasoningContent = '';
let streamer = createIncrementalTextStreamer({
transform: sanitizeResponse,
isBlockedPrefix: (text) => isRefusal(text.substring(0, 300)),
});
let reasoningSent = false;
const executeAttempt = async (): Promise<{
rawResponse: string;
visibleText: string;
reasoningContent: string;
streamer: ReturnType<typeof createIncrementalTextStreamer>;
}> => {
let rawResponse = '';
let visibleText = '';
let leadingBuffer = '';
let leadingResolved = false;
let reasoningContent = '';
const attemptStreamer = createIncrementalTextStreamer({
transform: sanitizeResponse,
isBlockedPrefix: (text) => isRefusal(text.substring(0, 300)),
});
const flushVisible = (chunk: string): void => {
if (!chunk) return;
visibleText += chunk;
const delta = attemptStreamer.push(chunk);
if (!delta) return;
if (thinkingEnabled && reasoningContent && !reasoningSent) {
writeOpenAIReasoningDelta(res, streamMeta.id, streamMeta.created, streamMeta.model, reasoningContent);
reasoningSent = true;
}
writeOpenAITextDelta(res, streamMeta.id, streamMeta.created, streamMeta.model, delta);
};
await sendCursorRequest(activeCursorReq, (event: CursorSSEEvent) => {
if (event.type !== 'text-delta' || !event.delta) return;
rawResponse += event.delta;
if (!leadingResolved) {
leadingBuffer += event.delta;
const split = splitLeadingThinkingBlocks(leadingBuffer);
if (split.startedWithThinking) {
if (!split.complete) return;
reasoningContent = split.thinkingContent;
leadingResolved = true;
leadingBuffer = '';
flushVisible(split.remainder);
return;
}
leadingResolved = true;
const buffered = leadingBuffer;
leadingBuffer = '';
flushVisible(buffered);
return;
}
flushVisible(event.delta);
});
return {
rawResponse,
visibleText,
reasoningContent,
streamer: attemptStreamer,
};
};
while (true) {
const attempt = await executeAttempt();
finalRawResponse = attempt.rawResponse;
finalVisibleText = attempt.visibleText;
finalReasoningContent = attempt.reasoningContent;
streamer = attempt.streamer;
const textForRefusalCheck = finalVisibleText;
if (!streamer.hasSentText() && isRefusal(textForRefusalCheck) && retryCount < MAX_REFUSAL_RETRIES) {
retryCount++;
const retryBody = buildRetryRequest(anthropicReq, retryCount - 1);
activeCursorReq = await convertToCursorRequest(retryBody);
reasoningSent = false;
continue;
}
break;
}
const refusalText = finalVisibleText;
const usedFallback = !streamer.hasSentText() && isRefusal(refusalText);
let finalTextToSend: string;
if (usedFallback) {
finalTextToSend = isToolCapabilityQuestion(anthropicReq)
? CLAUDE_TOOLS_RESPONSE
: CLAUDE_IDENTITY_RESPONSE;
} else {
finalTextToSend = streamer.finish();
}
if (!usedFallback && thinkingEnabled && finalReasoningContent && !reasoningSent) {
writeOpenAIReasoningDelta(res, streamMeta.id, streamMeta.created, streamMeta.model, finalReasoningContent);
reasoningSent = true;
}
writeOpenAITextDelta(res, streamMeta.id, streamMeta.created, streamMeta.model, finalTextToSend);
writeOpenAISSE(res, {
id: streamMeta.id,
object: 'chat.completion.chunk',
created: streamMeta.created,
model: streamMeta.model,
choices: [{
index: 0,
delta: {},
finish_reason: 'stop',
}],
});
res.write('data: [DONE]\n\n');
res.end();
}
// ==================== 流式处理OpenAI SSE 格式) ====================
async function handleOpenAIStream(
@@ -420,6 +602,11 @@ async function handleOpenAIStream(
};
try {
if (!hasTools && (!body.response_format || body.response_format.type === 'text')) {
await handleOpenAIIncrementalTextStream(res, cursorReq, body, anthropicReq, { id, created, model });
return;
}
await executeStream();
// 日志记录在详细日志中 (Web UI 可见)
@@ -433,7 +620,7 @@ async function handleOpenAIStream(
if (thinkingEnabled) {
reasoningContent = thinkingMatch.map(m => m.replace(/<\/?thinking>/g, '').trim()).join('\n\n');
}
fullResponse = fullResponse.replace(/<thinking>[\s\S]*?<\/thinking>\s*/g, '').trim();
fullResponse = stripThinkingTags(fullResponse);
// thinking 剥离记录在详细日志中
}
}

180
src/streaming-text.ts Normal file
View File

@@ -0,0 +1,180 @@
/**
* streaming-text.ts - 流式文本增量释放辅助
*
* 目标:
* 1. 为纯正文流提供更接近“打字效果”的增量输出
* 2. 在真正开始向客户端输出前,先保留一小段预热文本,降低拒绝前缀泄漏概率
* 3. 发送时保留尾部保护窗口,给跨 chunk 的清洗规则预留上下文
*/
export interface LeadingThinkingSplit {
startedWithThinking: boolean;
complete: boolean;
thinkingContent: string;
remainder: string;
}
export interface IncrementalTextStreamerOptions {
warmupChars?: number;
guardChars?: number;
transform?: (text: string) => string;
isBlockedPrefix?: (text: string) => boolean;
}
export interface IncrementalTextStreamer {
push(chunk: string): string;
finish(): string;
hasUnlocked(): boolean;
hasSentText(): boolean;
getRawText(): string;
}
const THINKING_OPEN = '<thinking>';
const THINKING_CLOSE = '</thinking>';
const DEFAULT_WARMUP_CHARS = 96;
const DEFAULT_GUARD_CHARS = 256;
const STREAM_START_BOUNDARY_RE = /[\n。.!?]/;
/**
* 剥离完整的 thinking 标签,返回可用于拒绝检测或最终文本处理的正文。
*/
export function stripThinkingTags(text: string): string {
if (!text) return text;
return text.replace(/<thinking>[\s\S]*?<\/thinking>\s*/g, '').trim();
}
/**
* 只解析“前导 thinking 块”。
*
* Cursor 的 thinking 通常位于响应最前面,正文随后出现。
* 这里仅处理前导块,避免把正文中的普通文本误判成 thinking 标签。
*/
export function splitLeadingThinkingBlocks(text: string): LeadingThinkingSplit {
if (!text) {
return {
startedWithThinking: false,
complete: false,
thinkingContent: '',
remainder: '',
};
}
const trimmed = text.trimStart();
if (!trimmed.startsWith(THINKING_OPEN)) {
return {
startedWithThinking: false,
complete: false,
thinkingContent: '',
remainder: text,
};
}
let cursor = trimmed;
const thinkingParts: string[] = [];
while (cursor.startsWith(THINKING_OPEN)) {
const closeIndex = cursor.indexOf(THINKING_CLOSE, THINKING_OPEN.length);
if (closeIndex === -1) {
return {
startedWithThinking: true,
complete: false,
thinkingContent: '',
remainder: '',
};
}
const content = cursor.slice(THINKING_OPEN.length, closeIndex).trim();
if (content) thinkingParts.push(content);
cursor = cursor.slice(closeIndex + THINKING_CLOSE.length).trimStart();
}
return {
startedWithThinking: true,
complete: true,
thinkingContent: thinkingParts.join('\n\n'),
remainder: cursor,
};
}
/**
* 创建增量文本释放器。
*
* 释放策略:
* - 先缓冲一小段,确认不像拒绝前缀,再开始输出
* - 输出时总是保留尾部 guardChars不把“边界附近”的文本过早发出去
* - 最终 finish() 时再把剩余文本一次性补齐
*/
export function createIncrementalTextStreamer(
options: IncrementalTextStreamerOptions = {},
): IncrementalTextStreamer {
const warmupChars = options.warmupChars ?? DEFAULT_WARMUP_CHARS;
const guardChars = options.guardChars ?? DEFAULT_GUARD_CHARS;
const transform = options.transform ?? ((text: string) => text);
const isBlockedPrefix = options.isBlockedPrefix ?? (() => false);
let rawText = '';
let sentText = '';
let unlocked = false;
let sentAny = false;
const tryUnlock = (): boolean => {
if (unlocked) return true;
const preview = transform(rawText);
if (!preview.trim()) return false;
const hasBoundary = STREAM_START_BOUNDARY_RE.test(preview);
const enoughChars = preview.length >= warmupChars;
if (!hasBoundary && !enoughChars) {
return false;
}
if (isBlockedPrefix(preview.trim())) {
return false;
}
unlocked = true;
return true;
};
const emitFromRawLength = (rawLength: number): string => {
const transformed = transform(rawText.slice(0, rawLength));
if (transformed.length <= sentText.length) return '';
const delta = transformed.slice(sentText.length);
sentText = transformed;
if (delta) sentAny = true;
return delta;
};
return {
push(chunk: string): string {
if (!chunk) return '';
rawText += chunk;
if (!tryUnlock()) return '';
const safeRawLength = Math.max(0, rawText.length - guardChars);
if (safeRawLength <= 0) return '';
return emitFromRawLength(safeRawLength);
},
finish(): string {
if (!rawText) return '';
return emitFromRawLength(rawText.length);
},
hasUnlocked(): boolean {
return unlocked;
},
hasSentText(): boolean {
return sentAny;
},
getRawText(): string {
return rawText;
},
};
}