From bf0e5c23f731e6d80457679e721c535823e5e60e Mon Sep 17 00:00:00 2001 From: 1137043480 <1137043480@users.noreply.github.com> Date: Sun, 3 May 2026 11:25:04 -0400 Subject: [PATCH] fix: prevent goroutine leaks in streaming executors via context-aware channel sends All streaming executors use bare channel sends (out <- chunk) inside goroutines that process upstream SSE responses. When the downstream consumer disconnects (client timeout, network drop, etc.), these sends block indefinitely, causing the goroutine and all associated resources (HTTP response body, scanner buffers, translation state) to leak permanently. Over time, leaked goroutines accumulate monotonically, leading to RSS growth from ~30MB to 3.7GB+ and eventual OOM kills on resource-constrained VPS hosts. Fix: Replace all bare 'out <- ...' sends with: select { case out <- ...: case <-ctx.Done(): return } This ensures goroutines terminate promptly when the request context is canceled, allowing GC to reclaim all associated resources. Affected executors (9 files, 36+ send sites): - antigravity_executor.go (5 sites) - gemini_cli_executor.go (6 sites) - gemini_vertex_executor.go (6 sites) - aistudio_executor.go (4 sites) - gemini_executor.go (3 sites) - openai_compat_executor.go (3 sites) - claude_executor.go (4 sites) - codex_executor.go (2 sites) - kimi_executor.go (3 sites) --- .../runtime/executor/aistudio_executor.go | 22 +++++++++--- .../runtime/executor/antigravity_executor.go | 28 ++++++++++++--- internal/runtime/executor/claude_executor.go | 22 +++++++++--- internal/runtime/executor/codex_executor.go | 11 ++++-- .../runtime/executor/gemini_cli_executor.go | 34 +++++++++++++++---- internal/runtime/executor/gemini_executor.go | 17 ++++++++-- .../executor/gemini_vertex_executor.go | 34 +++++++++++++++---- internal/runtime/executor/kimi_executor.go | 17 ++++++++-- .../executor/openai_compat_executor.go | 17 ++++++++-- 9 files changed, 166 insertions(+), 36 deletions(-) diff --git a/internal/runtime/executor/aistudio_executor.go b/internal/runtime/executor/aistudio_executor.go index 73491d824..37e85377b 100644 --- a/internal/runtime/executor/aistudio_executor.go +++ b/internal/runtime/executor/aistudio_executor.go @@ -285,7 +285,10 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth if event.Err != nil { helps.RecordAPIResponseError(ctx, e.cfg, event.Err) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)} + select { + case out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)}: + case <-ctx.Done(): + } return false } switch event.Type { @@ -303,7 +306,11 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth } lines := sdktranslator.TranslateStream(ctx, body.toFormat, opts.SourceFormat, req.Model, opts.OriginalRequest, translatedReq, filtered, ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: ensureColonSpacedJSON(lines[i])} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: ensureColonSpacedJSON(lines[i])}: + case <-ctx.Done(): + return false + } } break } @@ -319,14 +326,21 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth } lines := sdktranslator.TranslateStream(ctx, body.toFormat, opts.SourceFormat, req.Model, opts.OriginalRequest, translatedReq, event.Payload, ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: ensureColonSpacedJSON(lines[i])} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: ensureColonSpacedJSON(lines[i])}: + case <-ctx.Done(): + return false + } } reporter.Publish(ctx, helps.ParseGeminiUsage(event.Payload)) return false case wsrelay.MessageTypeError: helps.RecordAPIResponseError(ctx, e.cfg, event.Err) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)} + select { + case out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)}: + case <-ctx.Done(): + } return false } return true diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index 280c799af..c07680e8e 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -894,12 +894,19 @@ attemptLoop: reporter.Publish(ctx, detail) } - out <- cliproxyexecutor.StreamChunk{Payload: payload} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: payload}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } else { reporter.EnsurePublished(ctx) } @@ -1357,17 +1364,28 @@ attemptLoop: chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, bytes.Clone(payload), ¶m) for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}: + case <-ctx.Done(): + return + } } } tail := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, []byte("[DONE]"), ¶m) for i := range tail { - out <- cliproxyexecutor.StreamChunk{Payload: tail[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: tail[i]}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } else { reporter.EnsurePublished(ctx) } diff --git a/internal/runtime/executor/claude_executor.go b/internal/runtime/executor/claude_executor.go index 66432ac40..ea94526e1 100644 --- a/internal/runtime/executor/claude_executor.go +++ b/internal/runtime/executor/claude_executor.go @@ -484,12 +484,19 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A cloned := make([]byte, len(line)+1) copy(cloned, line) cloned[len(line)] = '\n' - out <- cliproxyexecutor.StreamChunk{Payload: cloned} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: cloned}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } return } @@ -521,13 +528,20 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A ¶m, ) for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}: + case <-ctx.Done(): + return + } } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } }() return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index aa8223f4f..6efc25b01 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -515,13 +515,20 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, originalPayload, body, translatedLine, ¶m) for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}: + case <-ctx.Done(): + return + } } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } }() return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil diff --git a/internal/runtime/executor/gemini_cli_executor.go b/internal/runtime/executor/gemini_cli_executor.go index 15e845722..b6210e6a1 100644 --- a/internal/runtime/executor/gemini_cli_executor.go +++ b/internal/runtime/executor/gemini_cli_executor.go @@ -411,19 +411,30 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut if bytes.HasPrefix(line, dataTag) { segments := sdktranslator.TranslateStream(respCtx, to, from, attemptModel, opts.OriginalRequest, reqBody, bytes.Clone(line), ¶m) for i := range segments { - out <- cliproxyexecutor.StreamChunk{Payload: segments[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: segments[i]}: + case <-ctx.Done(): + return + } } } } segments := sdktranslator.TranslateStream(respCtx, to, from, attemptModel, opts.OriginalRequest, reqBody, []byte("[DONE]"), ¶m) for i := range segments { - out <- cliproxyexecutor.StreamChunk{Payload: segments[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: segments[i]}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } return } reporter.EnsurePublished(ctx) @@ -434,7 +445,10 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut if errRead != nil { helps.RecordAPIResponseError(ctx, e.cfg, errRead) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errRead} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errRead}: + case <-ctx.Done(): + } return } helps.AppendAPIResponseChunk(ctx, e.cfg, data) @@ -442,12 +456,20 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut var param any segments := sdktranslator.TranslateStream(respCtx, to, from, attemptModel, opts.OriginalRequest, reqBody, data, ¶m) for i := range segments { - out <- cliproxyexecutor.StreamChunk{Payload: segments[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: segments[i]}: + case <-ctx.Done(): + return + } } segments = sdktranslator.TranslateStream(respCtx, to, from, attemptModel, opts.OriginalRequest, reqBody, []byte("[DONE]"), ¶m) for i := range segments { - out <- cliproxyexecutor.StreamChunk{Payload: segments[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: segments[i]}: + case <-ctx.Done(): + return + } } }(httpResp, append([]byte(nil), payload...), attemptModel) diff --git a/internal/runtime/executor/gemini_executor.go b/internal/runtime/executor/gemini_executor.go index 0e3c3ec6b..2a6e9a6e7 100644 --- a/internal/runtime/executor/gemini_executor.go +++ b/internal/runtime/executor/gemini_executor.go @@ -324,17 +324,28 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, bytes.Clone(payload), ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: lines[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: lines[i]}: + case <-ctx.Done(): + return + } } } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, []byte("[DONE]"), ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: lines[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: lines[i]}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } }() return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil diff --git a/internal/runtime/executor/gemini_vertex_executor.go b/internal/runtime/executor/gemini_vertex_executor.go index b147fde97..20f5aec12 100644 --- a/internal/runtime/executor/gemini_vertex_executor.go +++ b/internal/runtime/executor/gemini_vertex_executor.go @@ -656,17 +656,28 @@ func (e *GeminiVertexExecutor) executeStreamWithServiceAccount(ctx context.Conte } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, bytes.Clone(line), ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: lines[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: lines[i]}: + case <-ctx.Done(): + return + } } } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, []byte("[DONE]"), ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: lines[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: lines[i]}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } }() return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil @@ -786,17 +797,28 @@ func (e *GeminiVertexExecutor) executeStreamWithAPIKey(ctx context.Context, auth } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, bytes.Clone(line), ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: lines[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: lines[i]}: + case <-ctx.Done(): + return + } } } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, []byte("[DONE]"), ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: lines[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: lines[i]}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } }() return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil diff --git a/internal/runtime/executor/kimi_executor.go b/internal/runtime/executor/kimi_executor.go index 3588c9624..2bb0c7fda 100644 --- a/internal/runtime/executor/kimi_executor.go +++ b/internal/runtime/executor/kimi_executor.go @@ -290,17 +290,28 @@ func (e *KimiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut } chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, bytes.Clone(line), ¶m) for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}: + case <-ctx.Done(): + return + } } } doneChunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, []byte("[DONE]"), ¶m) for i := range doneChunks { - out <- cliproxyexecutor.StreamChunk{Payload: doneChunks[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: doneChunks[i]}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } }() return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil diff --git a/internal/runtime/executor/openai_compat_executor.go b/internal/runtime/executor/openai_compat_executor.go index 4e44a7ae0..ebddfddb1 100644 --- a/internal/runtime/executor/openai_compat_executor.go +++ b/internal/runtime/executor/openai_compat_executor.go @@ -293,20 +293,31 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy // Pass through translator; it yields one or more chunks for the target schema. chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, bytes.Clone(line), ¶m) for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}: + case <-ctx.Done(): + return + } } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } else { // In case the upstream close the stream without a terminal [DONE] marker. // Feed a synthetic done marker through the translator so pending // response.completed events are still emitted exactly once. chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, []byte("data: [DONE]"), ¶m) for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}: + case <-ctx.Done(): + return + } } } // Ensure we record the request if no usage chunk was ever seen