Merge pull request #3771 from Folyd/perf/codex-image-streaming-memory

perf(codex): avoid rebuilding completed JSON when extracting generated images
This commit is contained in:
Luis Pater
2026-06-09 01:49:12 +08:00
committed by GitHub
2 changed files with 156 additions and 28 deletions

View File

@@ -11,6 +11,7 @@ import (
"mime"
"mime/multipart"
"net/http"
"sort"
"strconv"
"strings"
"time"
@@ -150,8 +151,7 @@ func (e *CodexExecutor) executeOpenAIImage(ctx context.Context, auth *cliproxyau
reporter.Publish(ctx, detail)
}
publishCodexImageToolUsage(ctx, reporter, body, eventData)
completedData := patchCodexCompletedOutput(eventData, outputItemsByIndex, outputItemsFallback)
results, createdAt, usageRaw, firstMeta, errExtract := codexExtractImagesFromResponsesCompleted(completedData)
results, createdAt, usageRaw, firstMeta, errExtract := codexExtractImageResults(eventData, outputItemsByIndex, outputItemsFallback)
if errExtract != nil {
return resp, errExtract
}
@@ -275,8 +275,7 @@ func (e *CodexExecutor) executeOpenAIImageStream(ctx context.Context, auth *clip
reporter.Publish(ctx, detail)
}
publishCodexImageToolUsage(ctx, reporter, body, eventData)
completedData := patchCodexCompletedOutput(eventData, outputItemsByIndex, outputItemsFallback)
results, _, usageRaw, _, errExtract := codexExtractImagesFromResponsesCompleted(completedData)
results, _, usageRaw, _, errExtract := codexExtractImageResults(eventData, outputItemsByIndex, outputItemsFallback)
if errExtract != nil {
sendError(errExtract)
return
@@ -578,39 +577,76 @@ func codexMultipartFileToDataURL(fileHeader *multipart.FileHeader) (string, erro
return "data:" + mediaType + ";base64," + base64.StdEncoding.EncodeToString(data), nil
}
func codexExtractImagesFromResponsesCompleted(payload []byte) (results []codexImageCallResult, createdAt int64, usageRaw []byte, firstMeta codexImageCallResult, err error) {
if gjson.GetBytes(payload, "type").String() != "response.completed" {
// codexExtractImageResults extracts image generation results directly from the
// completed event and the items collected from response.output_item.done events,
// without rebuilding the full completed JSON.
//
// It prefers image_generation_call items already present in the completed event's
// response.output and only falls back to the collected items when that output is
// empty — mirroring the semantics of patchCodexCompletedOutput + the previous
// extractor. Skipping the concatenate-and-reparse step avoids two large copies of
// the base64 payload, which matters for multi-megabyte generated images.
func codexExtractImageResults(completed []byte, itemsByIndex map[int64][]byte, fallback [][]byte) (results []codexImageCallResult, createdAt int64, usageRaw []byte, firstMeta codexImageCallResult, err error) {
if gjson.GetBytes(completed, "type").String() != "response.completed" {
return nil, 0, nil, codexImageCallResult{}, fmt.Errorf("unexpected event type")
}
createdAt = gjson.GetBytes(payload, "response.created_at").Int()
createdAt = gjson.GetBytes(completed, "response.created_at").Int()
if createdAt <= 0 {
createdAt = time.Now().Unix()
}
output := gjson.GetBytes(payload, "response.output")
if output.IsArray() {
for _, item := range output.Array() {
if item.Get("type").String() != "image_generation_call" {
continue
appendItem := func(item gjson.Result) {
if item.Get("type").String() != "image_generation_call" {
return
}
res := strings.TrimSpace(item.Get("result").String())
if res == "" {
return
}
entry := codexImageCallResult{
Result: res,
RevisedPrompt: strings.TrimSpace(item.Get("revised_prompt").String()),
OutputFormat: strings.TrimSpace(item.Get("output_format").String()),
Size: strings.TrimSpace(item.Get("size").String()),
Background: strings.TrimSpace(item.Get("background").String()),
Quality: strings.TrimSpace(item.Get("quality").String()),
}
if len(results) == 0 {
firstMeta = entry
}
results = append(results, entry)
}
var outputItems []gjson.Result
if output := gjson.GetBytes(completed, "response.output"); output.Exists() && output.IsArray() {
outputItems = output.Array()
}
if len(outputItems) > 0 {
// Completed event already carries the output; extract from it in place.
results = make([]codexImageCallResult, 0, len(outputItems))
for _, item := range outputItems {
appendItem(item)
}
} else if len(itemsByIndex) > 0 || len(fallback) > 0 {
// Completed output was empty; extract directly from the collected items,
// preserving their original output_index ordering.
results = make([]codexImageCallResult, 0, len(itemsByIndex)+len(fallback))
if len(itemsByIndex) > 0 {
indexes := make([]int64, 0, len(itemsByIndex))
for idx := range itemsByIndex {
indexes = append(indexes, idx)
}
res := strings.TrimSpace(item.Get("result").String())
if res == "" {
continue
sort.Slice(indexes, func(i, j int) bool { return indexes[i] < indexes[j] })
for _, idx := range indexes {
appendItem(gjson.ParseBytes(itemsByIndex[idx]))
}
entry := codexImageCallResult{
Result: res,
RevisedPrompt: strings.TrimSpace(item.Get("revised_prompt").String()),
OutputFormat: strings.TrimSpace(item.Get("output_format").String()),
Size: strings.TrimSpace(item.Get("size").String()),
Background: strings.TrimSpace(item.Get("background").String()),
Quality: strings.TrimSpace(item.Get("quality").String()),
}
if len(results) == 0 {
firstMeta = entry
}
results = append(results, entry)
}
for _, raw := range fallback {
appendItem(gjson.ParseBytes(raw))
}
}
if usage := gjson.GetBytes(payload, "response.tool_usage.image_gen"); usage.Exists() && usage.IsObject() {
if usage := gjson.GetBytes(completed, "response.tool_usage.image_gen"); usage.Exists() && usage.IsObject() {
usageRaw = []byte(usage.Raw)
}
return results, createdAt, usageRaw, firstMeta, nil

View File

@@ -0,0 +1,92 @@
package executor
import (
"testing"
)
// item builds a minimal image_generation_call item JSON.
func imageGenItem(result, format string) []byte {
return []byte(`{"type":"image_generation_call","result":"` + result + `","output_format":"` + format + `"}`)
}
func TestCodexExtractImageResults_FromCompletedOutput(t *testing.T) {
completed := []byte(`{"type":"response.completed","response":{"created_at":111,"output":[` +
string(imageGenItem("AAA", "png")) + `]}}`)
results, createdAt, _, firstMeta, err := codexExtractImageResults(completed, nil, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if createdAt != 111 {
t.Fatalf("createdAt = %d, want 111", createdAt)
}
if len(results) != 1 || results[0].Result != "AAA" {
t.Fatalf("unexpected results: %+v", results)
}
if firstMeta.OutputFormat != "png" {
t.Fatalf("firstMeta.OutputFormat = %q, want png", firstMeta.OutputFormat)
}
}
func TestCodexExtractImageResults_FallbackToCollectedItemsOrdered(t *testing.T) {
// Completed event has an empty output; images arrived via output_item.done.
completed := []byte(`{"type":"response.completed","response":{"created_at":222,"output":[]}}`)
itemsByIndex := map[int64][]byte{
2: imageGenItem("SECOND", "png"),
0: imageGenItem("FIRST", "jpg"),
}
results, createdAt, _, _, err := codexExtractImageResults(completed, itemsByIndex, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if createdAt != 222 {
t.Fatalf("createdAt = %d, want 222", createdAt)
}
if len(results) != 2 {
t.Fatalf("expected 2 results, got %d: %+v", len(results), results)
}
// Ordering must follow output_index (0 before 2).
if results[0].Result != "FIRST" || results[1].Result != "SECOND" {
t.Fatalf("results out of order: %+v", results)
}
}
func TestCodexExtractImageResults_PrefersCompletedOutputOverItems(t *testing.T) {
// When the completed output is non-empty, collected items must be ignored
// (matches the original patchCodexCompletedOutput behaviour).
completed := []byte(`{"type":"response.completed","response":{"created_at":333,"output":[` +
string(imageGenItem("FROM_OUTPUT", "png")) + `]}}`)
itemsByIndex := map[int64][]byte{0: imageGenItem("FROM_ITEMS", "png")}
results, _, _, _, err := codexExtractImageResults(completed, itemsByIndex, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(results) != 1 || results[0].Result != "FROM_OUTPUT" {
t.Fatalf("expected to prefer completed output, got %+v", results)
}
}
func TestCodexExtractImageResults_WrongEventType(t *testing.T) {
if _, _, _, _, err := codexExtractImageResults([]byte(`{"type":"response.in_progress"}`), nil, nil); err == nil {
t.Fatalf("expected error for non-completed event type")
}
}
func TestCodexExtractImageResults_FallbackList(t *testing.T) {
// Items collected without an output_index land in the fallback slice.
completed := []byte(`{"type":"response.completed","response":{"created_at":444}}`)
fallback := [][]byte{imageGenItem("FB", "webp")}
results, _, _, firstMeta, err := codexExtractImageResults(completed, nil, fallback)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(results) != 1 || results[0].Result != "FB" {
t.Fatalf("unexpected fallback results: %+v", results)
}
if firstMeta.OutputFormat != "webp" {
t.Fatalf("firstMeta.OutputFormat = %q, want webp", firstMeta.OutputFormat)
}
}