fix: 修复 thinking 内容泄漏到正文 + PR#51 keepalive 定时器泄漏 (#53)

1. Thinking 提取修复 (Issue #53):
   - 用 indexOf/lastIndexOf 替换所有非贪婪正则 [\\s\\S]*?
   - 防止 thinking 内容包含 </thinking> 字面量时提前截断
   - 新增 extractThinking() 工具函数统一处理
   - 修复涉及 handler.ts、openai-handler.ts、streaming-text.ts

2. PR#51 keepalive 修复:
   - handleDirectTextStream 新增独立 keepalive + try/finally 清理
   - handleStream 的 keepalive 仅在工具模式分支创建,避免泄漏
This commit is contained in:
小海
2026-03-17 09:00:34 +08:00
parent 66b5d5d0b3
commit 91679d39b3
3 changed files with 91 additions and 33 deletions

View File

@@ -140,6 +140,38 @@ export function isRefusal(text: string): boolean {
return REFUSAL_PATTERNS.some(p => p.test(text));
}
// ==================== Thinking 提取 ====================
const THINKING_OPEN = '<thinking>';
const THINKING_CLOSE = '</thinking>';
/**
* 安全提取 thinking 内容并返回剥离后的正文。
*
* ★ 使用 indexOf + lastIndexOf 而非非贪婪正则 [\s\S]*?
* 防止 thinking 内容本身包含 </thinking> 字面量时提前截断,
* 导致 thinking 后半段 + 闭合标签泄漏到正文。
*/
export function extractThinking(text: string): { thinkingContent: string; strippedText: string } {
const startIdx = text.indexOf(THINKING_OPEN);
if (startIdx === -1) return { thinkingContent: '', strippedText: text };
const contentStart = startIdx + THINKING_OPEN.length;
const endIdx = text.lastIndexOf(THINKING_CLOSE);
if (endIdx > startIdx) {
return {
thinkingContent: text.slice(contentStart, endIdx).trim(),
strippedText: (text.slice(0, startIdx) + text.slice(endIdx + THINKING_CLOSE.length)).trim(),
};
}
// 未闭合(流式截断)→ thinking 取到末尾,正文为开头部分
return {
thinkingContent: text.slice(contentStart).trim(),
strippedText: text.slice(0, startIdx).trim(),
};
}
// ==================== 模型列表 ====================
export function listModels(_req: Request, res: Response): void {
@@ -686,6 +718,16 @@ async function handleDirectTextStream(
clientRequestedThinking: boolean,
streamState: { blockIndex: number; textBlockStarted: boolean; thinkingEmitted: boolean },
): Promise<void> {
// ★ 流式保活:增量流式路径也需要 keepalive防止 thinking 缓冲期间网关 504
const keepaliveInterval = setInterval(() => {
try {
res.write(': keepalive\n\n');
// @ts-expect-error flush exists on ServerResponse when compression is used
if (typeof res.flush === 'function') res.flush();
} catch { /* connection already closed, ignore */ }
}, 15000);
try {
let activeCursorReq = cursorReq;
let retryCount = 0;
let finalRawResponse = '';
@@ -816,9 +858,9 @@ async function handleDirectTextStream(
});
if (!finalThinkingContent && finalRawResponse.includes('<thinking>')) {
const thinkingMatch = finalRawResponse.match(/<thinking>([\s\S]*?)<\/thinking>/g);
if (thinkingMatch) {
finalThinkingContent = thinkingMatch.map(m => m.replace(/<\/?thinking>/g, '').trim()).join('\n\n');
const { thinkingContent: extracted } = extractThinking(finalRawResponse);
if (extracted) {
finalThinkingContent = extracted;
}
}
@@ -875,6 +917,9 @@ async function handleDirectTextStream(
log.complete(finalRecordedResponse.length, 'end_turn');
res.end();
} finally {
clearInterval(keepaliveInterval);
}
}
// ==================== 流式处理 ====================
@@ -902,14 +947,9 @@ async function handleStream(res: Response, cursorReq: CursorChatRequest, body: A
},
});
// ★ 流式保活:在缓冲/续写期间每 15s 发送 SSE 注释,防止网关判定连接空闲超时 (504)
const keepaliveInterval = setInterval(() => {
try {
res.write(': keepalive\n\n');
// @ts-expect-error flush exists on ServerResponse when compression is used
if (typeof res.flush === 'function') res.flush();
} catch { /* connection already closed, ignore */ }
}, 15000);
// ★ 流式保活 — 注意无工具的增量流式路径handleDirectTextStream有自己的 keepalive
// 这里的 keepalive 仅用于工具模式下的缓冲/续写期间
let keepaliveInterval: ReturnType<typeof setInterval> | undefined;
let fullResponse = '';
let sentText = '';
@@ -945,6 +985,15 @@ async function handleStream(res: Response, cursorReq: CursorChatRequest, body: A
return;
}
// 工具模式:创建 keepalive无工具路径已在 handleDirectTextStream 内部处理)
keepaliveInterval = setInterval(() => {
try {
res.write(': keepalive\n\n');
// @ts-expect-error flush exists on ServerResponse when compression is used
if (typeof res.flush === 'function') res.flush();
} catch { /* connection already closed, ignore */ }
}, 15000);
await executeStream();
log.recordRawResponse(fullResponse);
@@ -956,14 +1005,14 @@ async function handleStream(res: Response, cursorReq: CursorChatRequest, body: A
// ★ Thinking 提取(在拒绝检测之前,防止 thinking 内容触发 isRefusal 误判)
let thinkingContent = '';
if (fullResponse.includes('<thinking>')) {
const thinkingMatch = fullResponse.match(/<thinking>([\s\S]*?)<\/thinking>/g);
if (thinkingMatch) {
thinkingContent = thinkingMatch.map(m => m.replace(/<\/?thinking>/g, '').trim()).join('\n\n');
const { thinkingContent: extracted, strippedText } = extractThinking(fullResponse);
if (extracted) {
thinkingContent = extracted;
log.recordThinking(thinkingContent);
log.updateSummary({ thinkingChars: thinkingContent.length });
if (clientRequestedThinking) {
// 客户端原生请求 thinking → 剥离标签,稍后发送 thinking content block
fullResponse = fullResponse.replace(/<thinking>[\s\S]*?<\/thinking>\s*/g, '').trim();
fullResponse = strippedText;
log.info('Handler', 'thinking', `剥离 thinking → content block: ${thinkingContent.length} chars, 剩余 ${fullResponse.length} chars`);
} else {
// proxy 注入的 thinking → 保留标签在正文中Claude Code 可直接显示
@@ -977,7 +1026,7 @@ async function handleStream(res: Response, cursorReq: CursorChatRequest, body: A
// 否则 thinking 中的反思性语言(如 "haven't given a specific task")会触发误判
const getTextForRefusalCheck = () => {
if (fullResponse.includes('<thinking>')) {
return fullResponse.replace(/<thinking>[\s\S]*?<\/thinking>\s*/g, '').trim();
return extractThinking(fullResponse).strippedText;
}
return fullResponse;
};
@@ -1360,11 +1409,11 @@ async function handleNonStream(res: Response, cursorReq: CursorChatRequest, body
// ★ Thinking 提取(在拒绝检测之前)
let thinkingContent = '';
if (fullText.includes('<thinking>')) {
const thinkingMatch = fullText.match(/<thinking>([\s\S]*?)<\/thinking>/g);
if (thinkingMatch) {
thinkingContent = thinkingMatch.map(m => m.replace(/<\/?thinking>/g, '').trim()).join('\n\n');
const { thinkingContent: extracted, strippedText } = extractThinking(fullText);
if (extracted) {
thinkingContent = extracted;
if (clientRequestedThinking) {
fullText = fullText.replace(/<thinking>[\s\S]*?<\/thinking>\s*/g, '').trim();
fullText = strippedText;
log.info('Handler', 'thinking', `非流式剥离 thinking → content block: ${thinkingContent.length} chars, 剩余 ${fullText.length} chars`);
} else {
log.info('Handler', 'thinking', `非流式保留 thinking 在正文中: ${thinkingContent.length} chars`);
@@ -1376,7 +1425,7 @@ async function handleNonStream(res: Response, cursorReq: CursorChatRequest, body
// ★ 关键:拒绝检测必须在 thinking-stripped 文本上进行
const getTextForRefusalCheck = () => {
if (fullText.includes('<thinking>')) {
return fullText.replace(/<thinking>[\s\S]*?<\/thinking>\s*/g, '').trim();
return extractThinking(fullText).strippedText;
}
return fullText;
};

View File

@@ -35,6 +35,7 @@ import {
isIdentityProbe,
isToolCapabilityQuestion,
buildRetryRequest,
extractThinking,
CLAUDE_IDENTITY_RESPONSE,
CLAUDE_TOOLS_RESPONSE,
MAX_REFUSAL_RETRIES,
@@ -615,12 +616,12 @@ async function handleOpenAIStream(
const thinkingEnabled = anthropicReq.thinking?.type === 'enabled';
let reasoningContent: string | undefined;
if (fullResponse.includes('<thinking>')) {
const thinkingMatch = fullResponse.match(/<thinking>([\s\S]*?)<\/thinking>/g);
if (thinkingMatch) {
const { thinkingContent: extracted, strippedText } = extractThinking(fullResponse);
if (extracted) {
if (thinkingEnabled) {
reasoningContent = thinkingMatch.map(m => m.replace(/<\/?thinking>/g, '').trim()).join('\n\n');
reasoningContent = extracted;
}
fullResponse = stripThinkingTags(fullResponse);
fullResponse = strippedText;
// thinking 剥离记录在详细日志中
}
}
@@ -819,14 +820,13 @@ async function handleOpenAINonStream(
const thinkingEnabled = anthropicReq.thinking?.type === 'enabled';
let reasoningContent: string | undefined;
if (fullText.includes('<thinking>')) {
const thinkingMatch = fullText.match(/<thinking>([\s\S]*?)<\/thinking>/g);
if (thinkingMatch) {
const { thinkingContent: extracted, strippedText } = extractThinking(fullText);
if (extracted) {
if (thinkingEnabled) {
reasoningContent = thinkingMatch.map(m => m.replace(/<\/?thinking>/g, '').trim()).join('\n\n');
reasoningContent = extracted;
}
const stripped = fullText.replace(/<thinking>[\s\S]*?<\/thinking>\s*/g, '').trim();
// thinking 剥离记录
fullText = stripped;
fullText = strippedText;
}
}
@@ -841,7 +841,7 @@ async function handleOpenAINonStream(
fullText = await sendCursorRequestFull(retryCursorReq);
// 重试响应也需要先剥离 thinking
if (fullText.includes('<thinking>')) {
fullText = fullText.replace(/<thinking>[\s\S]*?<\/thinking>\s*/g, '').trim();
fullText = extractThinking(fullText).strippedText;
}
if (!shouldRetry()) break;
}

View File

@@ -37,10 +37,19 @@ const STREAM_START_BOUNDARY_RE = /[\n。.!?]/;
/**
* 剥离完整的 thinking 标签,返回可用于拒绝检测或最终文本处理的正文。
*
* ★ 使用 indexOf + lastIndexOf 而非非贪婪正则,防止 thinking 内容本身
* 包含 </thinking> 字面量时提前截断导致标签泄漏到正文。
*/
export function stripThinkingTags(text: string): string {
if (!text) return text;
return text.replace(/<thinking>[\s\S]*?<\/thinking>\s*/g, '').trim();
if (!text || !text.includes(THINKING_OPEN)) return text;
const startIdx = text.indexOf(THINKING_OPEN);
const endIdx = text.lastIndexOf(THINKING_CLOSE);
if (endIdx > startIdx) {
return (text.slice(0, startIdx) + text.slice(endIdx + THINKING_CLOSE.length)).trim();
}
// 未闭合(流式截断)→ 剥离从 <thinking> 开始的全部内容
return text.slice(0, startIdx).trim();
}
/**