diff --git a/sdk/api/handlers/openai/openai_responses_handlers.go b/sdk/api/handlers/openai/openai_responses_handlers.go index 578977d62..8dd1a0a7b 100644 --- a/sdk/api/handlers/openai/openai_responses_handlers.go +++ b/sdk/api/handlers/openai/openai_responses_handlers.go @@ -148,9 +148,12 @@ func responsesSSEFrameWithData(frame, payload []byte) []byte { out.Write(line) out.WriteByte('\n') } - out.WriteString("data: ") - out.Write(payload) - out.WriteString("\n\n") + for _, line := range bytes.Split(payload, []byte("\n")) { + out.WriteString("data: ") + out.Write(line) + out.WriteByte('\n') + } + out.WriteByte('\n') return out.Bytes() } diff --git a/sdk/api/handlers/openai/openai_responses_handlers_stream_test.go b/sdk/api/handlers/openai/openai_responses_handlers_stream_test.go index 3851278fb..151da9a79 100644 --- a/sdk/api/handlers/openai/openai_responses_handlers_stream_test.go +++ b/sdk/api/handlers/openai/openai_responses_handlers_stream_test.go @@ -122,6 +122,40 @@ func TestForwardResponsesStreamRepairsMixedIndexedAndUnindexedDoneItems(t *testi } } +func TestForwardResponsesStreamRepairsMultilineCompletedOutputAsSSEDataLines(t *testing.T) { + h, recorder, c, flusher := newResponsesStreamTestHandler(t) + + data := make(chan []byte, 2) + errs := make(chan *interfaces.ErrorMessage) + data <- []byte(`data: {"type":"response.output_item.done","item":{"type":"function_call","arguments":"{}"}}`) + data <- []byte("data: {\"type\":\"response.completed\",\ndata: \"response\":{\"id\":\"resp-1\",\"output\":[]}}\n\n") + close(data) + close(errs) + + h.forwardResponsesStream(c, flusher, func(error) {}, data, errs, nil) + + parts := strings.Split(strings.TrimSpace(recorder.Body.String()), "\n\n") + if len(parts) != 2 { + t.Fatalf("expected 2 SSE events, got %d. Body: %q", len(parts), recorder.Body.String()) + } + + completedFrame := []byte(parts[1]) + for _, line := range strings.Split(parts[1], "\n") { + if line != "" && !strings.HasPrefix(line, "data: ") { + t.Fatalf("expected every completed payload line to be an SSE data line, got %q in %q", line, parts[1]) + } + } + + payload, ok := responsesSSEDataPayload(completedFrame) + if !ok { + t.Fatalf("expected completed frame to contain data payload: %q", parts[1]) + } + output := gjson.GetBytes(payload, "response.output") + if !output.IsArray() || len(output.Array()) != 1 { + t.Fatalf("expected repaired completed output with 1 item, got %s from %q", output.Raw, payload) + } +} + func TestForwardResponsesStreamReassemblesSplitSSEEventChunks(t *testing.T) { h, recorder, c, flusher := newResponsesStreamTestHandler(t)