fix: SSE 事件格式错误导致 Codex CLI v0.117+ 出现 stream closed before response.completed

修复两处导致 Codex CLI v0.117+ 报错 "stream closed before response.completed" 的 bug:

Bug 1:writeResponsesSSE 发送的 data JSON 缺少 "type" 字段
  Codex 的 ResponsesStreamEvent 结构体要求每条事件的 data 必须包含
  "type" 字段,否则整条事件解析失败被跳过。

  修复:将 JSON.stringify(data) 改为 JSON.stringify({ type: eventType, ...data })

Bug 2(主要原因):response 对象未嵌套在 "response" 字段下
  Codex 处理 response.completed / response.created / response.in_progress 时,
  从 event.response 字段读取 response 对象。cursor2api 原本将
  buildResponseObject() 的字段全部展开到顶层,导致 event.response = null,
  整个事件被静默忽略,等流关闭时报 "stream closed before response.completed"。

  修复:所有 buildResponseObject() 调用改为 { response: buildResponseObject(...) }
  共修改 8 处(emitResponsesTextStream、工具调用路径、错误回退路径)

参考:https://github.com/openai/codex/blob/main/codex-rs/codex-api/src/sse/responses.rs
关联 Issue:#114
This commit is contained in:
wsyh4567
2026-03-28 00:38:08 +08:00
parent 0716b602ac
commit 0958ca3c2a

View File

@@ -1295,7 +1295,7 @@ function writeOpenAISSE(res: Response, data: OpenAIChatCompletionChunk): void {
* 注意:与 Chat Completions 的 "data: {json}\n\n" 不同Responses API 需要 event: 前缀
*/
function writeResponsesSSE(res: Response, eventType: string, data: Record<string, unknown>): void {
res.write(`event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`);
res.write(`event: ${eventType}\ndata: ${JSON.stringify({ type: eventType, ...data })}\n\n`);
if (typeof (res as unknown as { flush: () => void }).flush === 'function') {
(res as unknown as { flush: () => void }).flush();
}
@@ -1458,10 +1458,10 @@ function emitResponsesTextStream(
const allOutputItems = toolCallItems ? [...toolCallItems, messageItem] : [messageItem];
// 1. response.created
writeResponsesSSE(res, 'response.created', buildResponseObject(respId, model, 'in_progress', []));
writeResponsesSSE(res, 'response.created', { response: buildResponseObject(respId, model, 'in_progress', []) });
// 2. response.in_progress
writeResponsesSSE(res, 'response.in_progress', buildResponseObject(respId, model, 'in_progress', []));
writeResponsesSSE(res, 'response.in_progress', { response: buildResponseObject(respId, model, 'in_progress', []) });
// 3. 文本 output item
writeResponsesSSE(res, 'response.output_item.added', {
@@ -1516,7 +1516,7 @@ function emitResponsesTextStream(
});
// 9. response.completed — ★ 这是 Codex 等待的关键事件
writeResponsesSSE(res, 'response.completed', buildResponseObject(respId, model, 'completed', allOutputItems, usage));
writeResponsesSSE(res, 'response.completed', { response: buildResponseObject(respId, model, 'completed', allOutputItems, usage) });
}
/**
@@ -1624,8 +1624,8 @@ async function handleResponsesStream(
log.recordToolCalls(toolCalls);
log.updateSummary({ toolCallsDetected: toolCalls.length });
// 1. response.created + response.in_progress
writeResponsesSSE(res, 'response.created', buildResponseObject(respId, model, 'in_progress', []));
writeResponsesSSE(res, 'response.in_progress', buildResponseObject(respId, model, 'in_progress', []));
writeResponsesSSE(res, 'response.created', { response: buildResponseObject(respId, model, 'in_progress', []) });
writeResponsesSSE(res, 'response.in_progress', { response: buildResponseObject(respId, model, 'in_progress', []) });
const allOutputItems: Record<string, unknown>[] = [];
let outputIndex = 0;
@@ -1713,7 +1713,7 @@ async function handleResponsesStream(
}
// 4. response.completed — ★ Codex 等待的关键事件
writeResponsesSSE(res, 'response.completed', buildResponseObject(respId, model, 'completed', allOutputItems, usage));
writeResponsesSSE(res, 'response.completed', { response: buildResponseObject(respId, model, 'completed', allOutputItems, usage) });
} else {
// 工具调用解析失败(误报)→ 作为纯文本发送
const msgItemId = responsesItemId();
@@ -1734,7 +1734,7 @@ async function handleResponsesStream(
try {
const errorText = `[Error: ${message}]`;
const errorItemId = responsesItemId();
writeResponsesSSE(res, 'response.created', buildResponseObject(respId, model, 'in_progress', []));
writeResponsesSSE(res, 'response.created', { response: buildResponseObject(respId, model, 'in_progress', []) });
writeResponsesSSE(res, 'response.output_item.added', {
output_index: 0,
item: { id: errorItemId, type: 'message', role: 'assistant', status: 'in_progress', content: [] },
@@ -1757,10 +1757,10 @@ async function handleResponsesStream(
output_index: 0,
item: { id: errorItemId, type: 'message', role: 'assistant', status: 'completed', content: [{ type: 'output_text', text: errorText, annotations: [] }] },
});
writeResponsesSSE(res, 'response.completed', buildResponseObject(respId, model, 'completed', [{
writeResponsesSSE(res, 'response.completed', { response: buildResponseObject(respId, model, 'completed', [{
id: errorItemId, type: 'message', role: 'assistant', status: 'completed',
content: [{ type: 'output_text', text: errorText, annotations: [] }],
}], { input_tokens: 0, output_tokens: 10, total_tokens: 10 }));
}], { input_tokens: 0, output_tokens: 10, total_tokens: 10 }) });
} catch { /* ignore double error */ }
} finally {
clearInterval(keepaliveInterval);