From 0958ca3c2a7e4374c41c824eb4df7e360acbf4ec Mon Sep 17 00:00:00 2001 From: wsyh4567 Date: Sat, 28 Mar 2026 00:38:08 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20SSE=20=E4=BA=8B=E4=BB=B6=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E9=94=99=E8=AF=AF=E5=AF=BC=E8=87=B4=20Codex=20CLI=20v?= =?UTF-8?q?0.117+=20=E5=87=BA=E7=8E=B0=20stream=20closed=20before=20respon?= =?UTF-8?q?se.completed?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修复两处导致 Codex CLI v0.117+ 报错 "stream closed before response.completed" 的 bug: Bug 1:writeResponsesSSE 发送的 data JSON 缺少 "type" 字段 Codex 的 ResponsesStreamEvent 结构体要求每条事件的 data 必须包含 "type" 字段,否则整条事件解析失败被跳过。 修复:将 JSON.stringify(data) 改为 JSON.stringify({ type: eventType, ...data }) Bug 2(主要原因):response 对象未嵌套在 "response" 字段下 Codex 处理 response.completed / response.created / response.in_progress 时, 从 event.response 字段读取 response 对象。cursor2api 原本将 buildResponseObject() 的字段全部展开到顶层,导致 event.response = null, 整个事件被静默忽略,等流关闭时报 "stream closed before response.completed"。 修复:所有 buildResponseObject() 调用改为 { response: buildResponseObject(...) } 共修改 8 处(emitResponsesTextStream、工具调用路径、错误回退路径) 参考:https://github.com/openai/codex/blob/main/codex-rs/codex-api/src/sse/responses.rs 关联 Issue:#114 --- src/openai-handler.ts | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/openai-handler.ts b/src/openai-handler.ts index a505a6c..967ea77 100644 --- a/src/openai-handler.ts +++ b/src/openai-handler.ts @@ -1295,7 +1295,7 @@ function writeOpenAISSE(res: Response, data: OpenAIChatCompletionChunk): void { * 注意:与 Chat Completions 的 "data: {json}\n\n" 不同,Responses API 需要 event: 前缀 */ function writeResponsesSSE(res: Response, eventType: string, data: Record): void { - res.write(`event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`); + res.write(`event: ${eventType}\ndata: ${JSON.stringify({ type: eventType, ...data })}\n\n`); if (typeof (res as unknown as { flush: () => void }).flush === 'function') { (res as unknown as { flush: () => void }).flush(); } @@ -1458,10 +1458,10 @@ function emitResponsesTextStream( const allOutputItems = toolCallItems ? [...toolCallItems, messageItem] : [messageItem]; // 1. response.created - writeResponsesSSE(res, 'response.created', buildResponseObject(respId, model, 'in_progress', [])); + writeResponsesSSE(res, 'response.created', { response: buildResponseObject(respId, model, 'in_progress', []) }); // 2. response.in_progress - writeResponsesSSE(res, 'response.in_progress', buildResponseObject(respId, model, 'in_progress', [])); + writeResponsesSSE(res, 'response.in_progress', { response: buildResponseObject(respId, model, 'in_progress', []) }); // 3. 文本 output item writeResponsesSSE(res, 'response.output_item.added', { @@ -1516,7 +1516,7 @@ function emitResponsesTextStream( }); // 9. response.completed — ★ 这是 Codex 等待的关键事件 - writeResponsesSSE(res, 'response.completed', buildResponseObject(respId, model, 'completed', allOutputItems, usage)); + writeResponsesSSE(res, 'response.completed', { response: buildResponseObject(respId, model, 'completed', allOutputItems, usage) }); } /** @@ -1624,8 +1624,8 @@ async function handleResponsesStream( log.recordToolCalls(toolCalls); log.updateSummary({ toolCallsDetected: toolCalls.length }); // 1. response.created + response.in_progress - writeResponsesSSE(res, 'response.created', buildResponseObject(respId, model, 'in_progress', [])); - writeResponsesSSE(res, 'response.in_progress', buildResponseObject(respId, model, 'in_progress', [])); + writeResponsesSSE(res, 'response.created', { response: buildResponseObject(respId, model, 'in_progress', []) }); + writeResponsesSSE(res, 'response.in_progress', { response: buildResponseObject(respId, model, 'in_progress', []) }); const allOutputItems: Record[] = []; let outputIndex = 0; @@ -1713,7 +1713,7 @@ async function handleResponsesStream( } // 4. response.completed — ★ Codex 等待的关键事件 - writeResponsesSSE(res, 'response.completed', buildResponseObject(respId, model, 'completed', allOutputItems, usage)); + writeResponsesSSE(res, 'response.completed', { response: buildResponseObject(respId, model, 'completed', allOutputItems, usage) }); } else { // 工具调用解析失败(误报)→ 作为纯文本发送 const msgItemId = responsesItemId(); @@ -1734,7 +1734,7 @@ async function handleResponsesStream( try { const errorText = `[Error: ${message}]`; const errorItemId = responsesItemId(); - writeResponsesSSE(res, 'response.created', buildResponseObject(respId, model, 'in_progress', [])); + writeResponsesSSE(res, 'response.created', { response: buildResponseObject(respId, model, 'in_progress', []) }); writeResponsesSSE(res, 'response.output_item.added', { output_index: 0, item: { id: errorItemId, type: 'message', role: 'assistant', status: 'in_progress', content: [] }, @@ -1757,10 +1757,10 @@ async function handleResponsesStream( output_index: 0, item: { id: errorItemId, type: 'message', role: 'assistant', status: 'completed', content: [{ type: 'output_text', text: errorText, annotations: [] }] }, }); - writeResponsesSSE(res, 'response.completed', buildResponseObject(respId, model, 'completed', [{ + writeResponsesSSE(res, 'response.completed', { response: buildResponseObject(respId, model, 'completed', [{ id: errorItemId, type: 'message', role: 'assistant', status: 'completed', content: [{ type: 'output_text', text: errorText, annotations: [] }], - }], { input_tokens: 0, output_tokens: 10, total_tokens: 10 })); + }], { input_tokens: 0, output_tokens: 10, total_tokens: 10 }) }); } catch { /* ignore double error */ } } finally { clearInterval(keepaliveInterval);