Files
CLIProxyAPI/internal/runtime/executor/openai_compat_executor.go
1137043480 bf0e5c23f7 fix: prevent goroutine leaks in streaming executors via context-aware channel sends
All streaming executors use bare channel sends (out <- chunk) inside goroutines
that process upstream SSE responses. When the downstream consumer disconnects
(client timeout, network drop, etc.), these sends block indefinitely, causing
the goroutine and all associated resources (HTTP response body, scanner buffers,
translation state) to leak permanently.

Over time, leaked goroutines accumulate monotonically, leading to RSS growth
from ~30MB to 3.7GB+ and eventual OOM kills on resource-constrained VPS hosts.

Fix: Replace all bare 'out <- ...' sends with:
  select {
  case out <- ...:
  case <-ctx.Done():
    return
  }

This ensures goroutines terminate promptly when the request context is canceled,
allowing GC to reclaim all associated resources.

Affected executors (9 files, 36+ send sites):
- antigravity_executor.go (5 sites)
- gemini_cli_executor.go (6 sites)
- gemini_vertex_executor.go (6 sites)
- aistudio_executor.go (4 sites)
- gemini_executor.go (3 sites)
- openai_compat_executor.go (3 sites)
- claude_executor.go (4 sites)
- codex_executor.go (2 sites)
- kimi_executor.go (3 sites)
2026-05-03 11:25:04 -04:00

428 lines
15 KiB
Go

package executor
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"github.com/router-for-me/CLIProxyAPI/v6/internal/runtime/executor/helps"
"github.com/router-for-me/CLIProxyAPI/v6/internal/thinking"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator"
log "github.com/sirupsen/logrus"
"github.com/tidwall/sjson"
)
// OpenAICompatExecutor implements a stateless executor for OpenAI-compatible providers.
// It performs request/response translation and executes against the provider base URL
// using per-auth credentials (API key) and per-auth HTTP transport (proxy) from context.
type OpenAICompatExecutor struct {
provider string
cfg *config.Config
}
// NewOpenAICompatExecutor creates an executor bound to a provider key (e.g., "openrouter").
func NewOpenAICompatExecutor(provider string, cfg *config.Config) *OpenAICompatExecutor {
return &OpenAICompatExecutor{provider: provider, cfg: cfg}
}
// Identifier implements cliproxyauth.ProviderExecutor.
func (e *OpenAICompatExecutor) Identifier() string { return e.provider }
// PrepareRequest injects OpenAI-compatible credentials into the outgoing HTTP request.
func (e *OpenAICompatExecutor) PrepareRequest(req *http.Request, auth *cliproxyauth.Auth) error {
if req == nil {
return nil
}
_, apiKey := e.resolveCredentials(auth)
if strings.TrimSpace(apiKey) != "" {
req.Header.Set("Authorization", "Bearer "+apiKey)
}
var attrs map[string]string
if auth != nil {
attrs = auth.Attributes
}
util.ApplyCustomHeadersFromAttrs(req, attrs)
return nil
}
// HttpRequest injects OpenAI-compatible credentials into the request and executes it.
func (e *OpenAICompatExecutor) HttpRequest(ctx context.Context, auth *cliproxyauth.Auth, req *http.Request) (*http.Response, error) {
if req == nil {
return nil, fmt.Errorf("openai compat executor: request is nil")
}
if ctx == nil {
ctx = req.Context()
}
httpReq := req.WithContext(ctx)
if err := e.PrepareRequest(httpReq, auth); err != nil {
return nil, err
}
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
return httpClient.Do(httpReq)
}
func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
baseModel := thinking.ParseSuffix(req.Model).ModelName
reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth)
defer reporter.TrackFailure(ctx, &err)
baseURL, apiKey := e.resolveCredentials(auth)
if baseURL == "" {
err = statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"}
return
}
from := opts.SourceFormat
to := sdktranslator.FromString("openai")
endpoint := "/chat/completions"
if opts.Alt == "responses/compact" {
to = sdktranslator.FromString("openai-response")
endpoint = "/responses/compact"
}
originalPayloadSource := req.Payload
if len(opts.OriginalRequest) > 0 {
originalPayloadSource = opts.OriginalRequest
}
originalPayload := originalPayloadSource
originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, opts.Stream)
translated := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, opts.Stream)
requestedModel := helps.PayloadRequestedModel(opts, req.Model)
requestPath := helps.PayloadRequestPath(opts)
translated = helps.ApplyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", translated, originalTranslated, requestedModel, requestPath)
if opts.Alt == "responses/compact" {
if updated, errDelete := sjson.DeleteBytes(translated, "stream"); errDelete == nil {
translated = updated
}
}
translated, err = thinking.ApplyThinking(translated, req.Model, from.String(), to.String(), e.Identifier())
if err != nil {
return resp, err
}
url := strings.TrimSuffix(baseURL, "/") + endpoint
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(translated))
if err != nil {
return resp, err
}
httpReq.Header.Set("Content-Type", "application/json")
if apiKey != "" {
httpReq.Header.Set("Authorization", "Bearer "+apiKey)
}
httpReq.Header.Set("User-Agent", "cli-proxy-openai-compat")
var attrs map[string]string
if auth != nil {
attrs = auth.Attributes
}
util.ApplyCustomHeadersFromAttrs(httpReq, attrs)
var authID, authLabel, authType, authValue string
if auth != nil {
authID = auth.ID
authLabel = auth.Label
authType, authValue = auth.AccountInfo()
}
helps.RecordAPIRequest(ctx, e.cfg, helps.UpstreamRequestLog{
URL: url,
Method: http.MethodPost,
Headers: httpReq.Header.Clone(),
Body: translated,
Provider: e.Identifier(),
AuthID: authID,
AuthLabel: authLabel,
AuthType: authType,
AuthValue: authValue,
})
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
helps.RecordAPIResponseError(ctx, e.cfg, err)
return resp, err
}
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("openai compat executor: close response body error: %v", errClose)
}
}()
helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
helps.AppendAPIResponseChunk(ctx, e.cfg, b)
helps.LogWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return resp, err
}
body, err := io.ReadAll(httpResp.Body)
if err != nil {
helps.RecordAPIResponseError(ctx, e.cfg, err)
return resp, err
}
helps.AppendAPIResponseChunk(ctx, e.cfg, body)
reporter.Publish(ctx, helps.ParseOpenAIUsage(body))
// Ensure we at least record the request even if upstream doesn't return usage
reporter.EnsurePublished(ctx)
// Translate response back to source format when needed
var param any
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, body, &param)
resp = cliproxyexecutor.Response{Payload: out, Headers: httpResp.Header.Clone()}
return resp, nil
}
func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) {
baseModel := thinking.ParseSuffix(req.Model).ModelName
reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth)
defer reporter.TrackFailure(ctx, &err)
baseURL, apiKey := e.resolveCredentials(auth)
if baseURL == "" {
err = statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"}
return nil, err
}
from := opts.SourceFormat
to := sdktranslator.FromString("openai")
originalPayloadSource := req.Payload
if len(opts.OriginalRequest) > 0 {
originalPayloadSource = opts.OriginalRequest
}
originalPayload := originalPayloadSource
originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, true)
translated := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, true)
requestedModel := helps.PayloadRequestedModel(opts, req.Model)
requestPath := helps.PayloadRequestPath(opts)
translated = helps.ApplyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", translated, originalTranslated, requestedModel, requestPath)
translated, err = thinking.ApplyThinking(translated, req.Model, from.String(), to.String(), e.Identifier())
if err != nil {
return nil, err
}
// Request usage data in the final streaming chunk so that token statistics
// are captured even when the upstream is an OpenAI-compatible provider.
translated, _ = sjson.SetBytes(translated, "stream_options.include_usage", true)
url := strings.TrimSuffix(baseURL, "/") + "/chat/completions"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(translated))
if err != nil {
return nil, err
}
httpReq.Header.Set("Content-Type", "application/json")
if apiKey != "" {
httpReq.Header.Set("Authorization", "Bearer "+apiKey)
}
httpReq.Header.Set("User-Agent", "cli-proxy-openai-compat")
var attrs map[string]string
if auth != nil {
attrs = auth.Attributes
}
util.ApplyCustomHeadersFromAttrs(httpReq, attrs)
httpReq.Header.Set("Accept", "text/event-stream")
httpReq.Header.Set("Cache-Control", "no-cache")
var authID, authLabel, authType, authValue string
if auth != nil {
authID = auth.ID
authLabel = auth.Label
authType, authValue = auth.AccountInfo()
}
helps.RecordAPIRequest(ctx, e.cfg, helps.UpstreamRequestLog{
URL: url,
Method: http.MethodPost,
Headers: httpReq.Header.Clone(),
Body: translated,
Provider: e.Identifier(),
AuthID: authID,
AuthLabel: authLabel,
AuthType: authType,
AuthValue: authValue,
})
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
helps.RecordAPIResponseError(ctx, e.cfg, err)
return nil, err
}
helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
helps.AppendAPIResponseChunk(ctx, e.cfg, b)
helps.LogWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("openai compat executor: close response body error: %v", errClose)
}
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return nil, err
}
out := make(chan cliproxyexecutor.StreamChunk)
go func() {
defer close(out)
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("openai compat executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(httpResp.Body)
scanner.Buffer(nil, 52_428_800) // 50MB
var param any
for scanner.Scan() {
line := scanner.Bytes()
helps.AppendAPIResponseChunk(ctx, e.cfg, line)
if detail, ok := helps.ParseOpenAIStreamUsage(line); ok {
reporter.Publish(ctx, detail)
}
if len(line) == 0 {
continue
}
if !bytes.HasPrefix(line, []byte("data:")) {
continue
}
// OpenAI-compatible streams are SSE: lines typically prefixed with "data: ".
// Pass through translator; it yields one or more chunks for the target schema.
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, bytes.Clone(line), &param)
for i := range chunks {
select {
case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}:
case <-ctx.Done():
return
}
}
}
if errScan := scanner.Err(); errScan != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
select {
case out <- cliproxyexecutor.StreamChunk{Err: errScan}:
case <-ctx.Done():
}
} else {
// In case the upstream close the stream without a terminal [DONE] marker.
// Feed a synthetic done marker through the translator so pending
// response.completed events are still emitted exactly once.
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, []byte("data: [DONE]"), &param)
for i := range chunks {
select {
case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}:
case <-ctx.Done():
return
}
}
}
// Ensure we record the request if no usage chunk was ever seen
reporter.EnsurePublished(ctx)
}()
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
}
func (e *OpenAICompatExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
baseModel := thinking.ParseSuffix(req.Model).ModelName
from := opts.SourceFormat
to := sdktranslator.FromString("openai")
translated := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, false)
modelForCounting := baseModel
translated, err := thinking.ApplyThinking(translated, req.Model, from.String(), to.String(), e.Identifier())
if err != nil {
return cliproxyexecutor.Response{}, err
}
enc, err := helps.TokenizerForModel(modelForCounting)
if err != nil {
return cliproxyexecutor.Response{}, fmt.Errorf("openai compat executor: tokenizer init failed: %w", err)
}
count, err := helps.CountOpenAIChatTokens(enc, translated)
if err != nil {
return cliproxyexecutor.Response{}, fmt.Errorf("openai compat executor: token counting failed: %w", err)
}
usageJSON := helps.BuildOpenAIUsageJSON(count)
translatedUsage := sdktranslator.TranslateTokenCount(ctx, to, from, count, usageJSON)
return cliproxyexecutor.Response{Payload: translatedUsage}, nil
}
// Refresh is a no-op for API-key based compatibility providers.
func (e *OpenAICompatExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) {
log.Debugf("openai compat executor: refresh called")
_ = ctx
return auth, nil
}
func (e *OpenAICompatExecutor) resolveCredentials(auth *cliproxyauth.Auth) (baseURL, apiKey string) {
if auth == nil {
return "", ""
}
if auth.Attributes != nil {
baseURL = strings.TrimSpace(auth.Attributes["base_url"])
apiKey = strings.TrimSpace(auth.Attributes["api_key"])
}
return
}
func (e *OpenAICompatExecutor) resolveCompatConfig(auth *cliproxyauth.Auth) *config.OpenAICompatibility {
if auth == nil || e.cfg == nil {
return nil
}
candidates := make([]string, 0, 3)
if auth.Attributes != nil {
if v := strings.TrimSpace(auth.Attributes["compat_name"]); v != "" {
candidates = append(candidates, v)
}
if v := strings.TrimSpace(auth.Attributes["provider_key"]); v != "" {
candidates = append(candidates, v)
}
}
if v := strings.TrimSpace(auth.Provider); v != "" {
candidates = append(candidates, v)
}
for i := range e.cfg.OpenAICompatibility {
compat := &e.cfg.OpenAICompatibility[i]
if compat.Disabled {
continue
}
for _, candidate := range candidates {
if candidate != "" && strings.EqualFold(strings.TrimSpace(candidate), compat.Name) {
return compat
}
}
}
return nil
}
func (e *OpenAICompatExecutor) overrideModel(payload []byte, model string) []byte {
if len(payload) == 0 || model == "" {
return payload
}
payload, _ = sjson.SetBytes(payload, "model", model)
return payload
}
type statusErr struct {
code int
msg string
retryAfter *time.Duration
}
func (e statusErr) Error() string {
if e.msg != "" {
return e.msg
}
return fmt.Sprintf("status %d", e.code)
}
func (e statusErr) StatusCode() int { return e.code }
func (e statusErr) RetryAfter() *time.Duration { return e.retryAfter }