diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index 1ceb0f73..a4156302 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -148,87 +148,108 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au baseURLs := antigravityBaseURLFallbackOrder(auth) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - var lastStatus int - var lastBody []byte - var lastErr error + attempts := antigravityRetryAttempts(e.cfg) - for idx, baseURL := range baseURLs { - httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, false, opts.Alt, baseURL) - if errReq != nil { - err = errReq - return resp, err - } +attemptLoop: + for attempt := 0; attempt < attempts; attempt++ { + var lastStatus int + var lastBody []byte + var lastErr error - httpResp, errDo := httpClient.Do(httpReq) - if errDo != nil { - recordAPIResponseError(ctx, e.cfg, errDo) - if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { - return resp, errDo + for idx, baseURL := range baseURLs { + httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, false, opts.Alt, baseURL) + if errReq != nil { + err = errReq + return resp, err } - lastStatus = 0 - lastBody = nil - lastErr = errDo - if idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) - continue + + httpResp, errDo := httpClient.Do(httpReq) + if errDo != nil { + recordAPIResponseError(ctx, e.cfg, errDo) + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return resp, errDo + } + lastStatus = 0 + lastBody = nil + lastErr = errDo + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + err = errDo + return resp, err } - err = errDo - return resp, err + + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + bodyBytes, errRead := io.ReadAll(httpResp.Body) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("antigravity executor: close response body error: %v", errClose) + } + if errRead != nil { + recordAPIResponseError(ctx, e.cfg, errRead) + err = errRead + return resp, err + } + appendAPIResponseChunk(ctx, e.cfg, bodyBytes) + + if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { + log.Debugf("antigravity executor: upstream error status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), bodyBytes)) + lastStatus = httpResp.StatusCode + lastBody = append([]byte(nil), bodyBytes...) + lastErr = nil + if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + if antigravityShouldRetryNoCapacity(httpResp.StatusCode, bodyBytes) { + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: no capacity on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + if attempt+1 < attempts { + delay := antigravityNoCapacityRetryDelay(attempt) + log.Debugf("antigravity executor: no capacity for model %s, retrying in %s (attempt %d/%d)", baseModel, delay, attempt+1, attempts) + if errWait := antigravityWait(ctx, delay); errWait != nil { + return resp, errWait + } + continue attemptLoop + } + } + sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} + if httpResp.StatusCode == http.StatusTooManyRequests { + if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil { + sErr.retryAfter = retryAfter + } + } + err = sErr + return resp, err + } + + reporter.publish(ctx, parseAntigravityUsage(bodyBytes)) + var param any + converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bodyBytes, ¶m) + resp = cliproxyexecutor.Response{Payload: []byte(converted)} + reporter.ensurePublished(ctx) + return resp, nil } - recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - bodyBytes, errRead := io.ReadAll(httpResp.Body) - if errClose := httpResp.Body.Close(); errClose != nil { - log.Errorf("antigravity executor: close response body error: %v", errClose) - } - if errRead != nil { - recordAPIResponseError(ctx, e.cfg, errRead) - err = errRead - return resp, err - } - appendAPIResponseChunk(ctx, e.cfg, bodyBytes) - - if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { - log.Debugf("antigravity executor: upstream error status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), bodyBytes)) - lastStatus = httpResp.StatusCode - lastBody = append([]byte(nil), bodyBytes...) - lastErr = nil - if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) - continue - } - sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} - if httpResp.StatusCode == http.StatusTooManyRequests { - if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil { + switch { + case lastStatus != 0: + sErr := statusErr{code: lastStatus, msg: string(lastBody)} + if lastStatus == http.StatusTooManyRequests { + if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil { sErr.retryAfter = retryAfter } } err = sErr - return resp, err + case lastErr != nil: + err = lastErr + default: + err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"} } - - reporter.publish(ctx, parseAntigravityUsage(bodyBytes)) - var param any - converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bodyBytes, ¶m) - resp = cliproxyexecutor.Response{Payload: []byte(converted)} - reporter.ensurePublished(ctx) - return resp, nil + return resp, err } - switch { - case lastStatus != 0: - sErr := statusErr{code: lastStatus, msg: string(lastBody)} - if lastStatus == http.StatusTooManyRequests { - if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil { - sErr.retryAfter = retryAfter - } - } - err = sErr - case lastErr != nil: - err = lastErr - default: - err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"} - } return resp, err } @@ -268,150 +289,171 @@ func (e *AntigravityExecutor) executeClaudeNonStream(ctx context.Context, auth * baseURLs := antigravityBaseURLFallbackOrder(auth) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - var lastStatus int - var lastBody []byte - var lastErr error + attempts := antigravityRetryAttempts(e.cfg) - for idx, baseURL := range baseURLs { - httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL) - if errReq != nil { - err = errReq - return resp, err - } +attemptLoop: + for attempt := 0; attempt < attempts; attempt++ { + var lastStatus int + var lastBody []byte + var lastErr error - httpResp, errDo := httpClient.Do(httpReq) - if errDo != nil { - recordAPIResponseError(ctx, e.cfg, errDo) - if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { - return resp, errDo + for idx, baseURL := range baseURLs { + httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL) + if errReq != nil { + err = errReq + return resp, err } - lastStatus = 0 - lastBody = nil - lastErr = errDo - if idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) - continue - } - err = errDo - return resp, err - } - recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { - bodyBytes, errRead := io.ReadAll(httpResp.Body) - if errClose := httpResp.Body.Close(); errClose != nil { - log.Errorf("antigravity executor: close response body error: %v", errClose) - } - if errRead != nil { - recordAPIResponseError(ctx, e.cfg, errRead) - if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) { - err = errRead - return resp, err - } - if errCtx := ctx.Err(); errCtx != nil { - err = errCtx - return resp, err + + httpResp, errDo := httpClient.Do(httpReq) + if errDo != nil { + recordAPIResponseError(ctx, e.cfg, errDo) + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return resp, errDo } lastStatus = 0 lastBody = nil - lastErr = errRead + lastErr = errDo if idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) continue } - err = errRead + err = errDo return resp, err } - appendAPIResponseChunk(ctx, e.cfg, bodyBytes) - lastStatus = httpResp.StatusCode - lastBody = append([]byte(nil), bodyBytes...) - lastErr = nil - if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) - continue + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { + bodyBytes, errRead := io.ReadAll(httpResp.Body) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("antigravity executor: close response body error: %v", errClose) + } + if errRead != nil { + recordAPIResponseError(ctx, e.cfg, errRead) + if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) { + err = errRead + return resp, err + } + if errCtx := ctx.Err(); errCtx != nil { + err = errCtx + return resp, err + } + lastStatus = 0 + lastBody = nil + lastErr = errRead + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + err = errRead + return resp, err + } + appendAPIResponseChunk(ctx, e.cfg, bodyBytes) + lastStatus = httpResp.StatusCode + lastBody = append([]byte(nil), bodyBytes...) + lastErr = nil + if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + if antigravityShouldRetryNoCapacity(httpResp.StatusCode, bodyBytes) { + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: no capacity on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + if attempt+1 < attempts { + delay := antigravityNoCapacityRetryDelay(attempt) + log.Debugf("antigravity executor: no capacity for model %s, retrying in %s (attempt %d/%d)", baseModel, delay, attempt+1, attempts) + if errWait := antigravityWait(ctx, delay); errWait != nil { + return resp, errWait + } + continue attemptLoop + } + } + sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} + if httpResp.StatusCode == http.StatusTooManyRequests { + if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil { + sErr.retryAfter = retryAfter + } + } + err = sErr + return resp, err } - sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} - if httpResp.StatusCode == http.StatusTooManyRequests { - if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil { + + out := make(chan cliproxyexecutor.StreamChunk) + go func(resp *http.Response) { + defer close(out) + defer func() { + if errClose := resp.Body.Close(); errClose != nil { + log.Errorf("antigravity executor: close response body error: %v", errClose) + } + }() + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(nil, streamScannerBuffer) + for scanner.Scan() { + line := scanner.Bytes() + appendAPIResponseChunk(ctx, e.cfg, line) + + // Filter usage metadata for all models + // Only retain usage statistics in the terminal chunk + line = FilterSSEUsageMetadata(line) + + payload := jsonPayload(line) + if payload == nil { + continue + } + + if detail, ok := parseAntigravityStreamUsage(payload); ok { + reporter.publish(ctx, detail) + } + + out <- cliproxyexecutor.StreamChunk{Payload: payload} + } + if errScan := scanner.Err(); errScan != nil { + recordAPIResponseError(ctx, e.cfg, errScan) + reporter.publishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{Err: errScan} + } else { + reporter.ensurePublished(ctx) + } + }(httpResp) + + var buffer bytes.Buffer + for chunk := range out { + if chunk.Err != nil { + return resp, chunk.Err + } + if len(chunk.Payload) > 0 { + _, _ = buffer.Write(chunk.Payload) + _, _ = buffer.Write([]byte("\n")) + } + } + resp = cliproxyexecutor.Response{Payload: e.convertStreamToNonStream(buffer.Bytes())} + + reporter.publish(ctx, parseAntigravityUsage(resp.Payload)) + var param any + converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, resp.Payload, ¶m) + resp = cliproxyexecutor.Response{Payload: []byte(converted)} + reporter.ensurePublished(ctx) + + return resp, nil + } + + switch { + case lastStatus != 0: + sErr := statusErr{code: lastStatus, msg: string(lastBody)} + if lastStatus == http.StatusTooManyRequests { + if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil { sErr.retryAfter = retryAfter } } err = sErr - return resp, err + case lastErr != nil: + err = lastErr + default: + err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"} } - - out := make(chan cliproxyexecutor.StreamChunk) - go func(resp *http.Response) { - defer close(out) - defer func() { - if errClose := resp.Body.Close(); errClose != nil { - log.Errorf("antigravity executor: close response body error: %v", errClose) - } - }() - scanner := bufio.NewScanner(resp.Body) - scanner.Buffer(nil, streamScannerBuffer) - for scanner.Scan() { - line := scanner.Bytes() - appendAPIResponseChunk(ctx, e.cfg, line) - - // Filter usage metadata for all models - // Only retain usage statistics in the terminal chunk - line = FilterSSEUsageMetadata(line) - - payload := jsonPayload(line) - if payload == nil { - continue - } - - if detail, ok := parseAntigravityStreamUsage(payload); ok { - reporter.publish(ctx, detail) - } - - out <- cliproxyexecutor.StreamChunk{Payload: payload} - } - if errScan := scanner.Err(); errScan != nil { - recordAPIResponseError(ctx, e.cfg, errScan) - reporter.publishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} - } else { - reporter.ensurePublished(ctx) - } - }(httpResp) - - var buffer bytes.Buffer - for chunk := range out { - if chunk.Err != nil { - return resp, chunk.Err - } - if len(chunk.Payload) > 0 { - _, _ = buffer.Write(chunk.Payload) - _, _ = buffer.Write([]byte("\n")) - } - } - resp = cliproxyexecutor.Response{Payload: e.convertStreamToNonStream(buffer.Bytes())} - - reporter.publish(ctx, parseAntigravityUsage(resp.Payload)) - var param any - converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, resp.Payload, ¶m) - resp = cliproxyexecutor.Response{Payload: []byte(converted)} - reporter.ensurePublished(ctx) - - return resp, nil + return resp, err } - switch { - case lastStatus != 0: - sErr := statusErr{code: lastStatus, msg: string(lastBody)} - if lastStatus == http.StatusTooManyRequests { - if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil { - sErr.retryAfter = retryAfter - } - } - err = sErr - case lastErr != nil: - err = lastErr - default: - err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"} - } return resp, err } @@ -635,139 +677,160 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya baseURLs := antigravityBaseURLFallbackOrder(auth) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - var lastStatus int - var lastBody []byte - var lastErr error + attempts := antigravityRetryAttempts(e.cfg) - for idx, baseURL := range baseURLs { - httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL) - if errReq != nil { - err = errReq - return nil, err - } - httpResp, errDo := httpClient.Do(httpReq) - if errDo != nil { - recordAPIResponseError(ctx, e.cfg, errDo) - if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { - return nil, errDo +attemptLoop: + for attempt := 0; attempt < attempts; attempt++ { + var lastStatus int + var lastBody []byte + var lastErr error + + for idx, baseURL := range baseURLs { + httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL) + if errReq != nil { + err = errReq + return nil, err } - lastStatus = 0 - lastBody = nil - lastErr = errDo - if idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) - continue - } - err = errDo - return nil, err - } - recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { - bodyBytes, errRead := io.ReadAll(httpResp.Body) - if errClose := httpResp.Body.Close(); errClose != nil { - log.Errorf("antigravity executor: close response body error: %v", errClose) - } - if errRead != nil { - recordAPIResponseError(ctx, e.cfg, errRead) - if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) { - err = errRead - return nil, err - } - if errCtx := ctx.Err(); errCtx != nil { - err = errCtx - return nil, err + httpResp, errDo := httpClient.Do(httpReq) + if errDo != nil { + recordAPIResponseError(ctx, e.cfg, errDo) + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return nil, errDo } lastStatus = 0 lastBody = nil - lastErr = errRead + lastErr = errDo if idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) continue } - err = errRead + err = errDo return nil, err } - appendAPIResponseChunk(ctx, e.cfg, bodyBytes) - lastStatus = httpResp.StatusCode - lastBody = append([]byte(nil), bodyBytes...) - lastErr = nil - if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) - continue + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { + bodyBytes, errRead := io.ReadAll(httpResp.Body) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("antigravity executor: close response body error: %v", errClose) + } + if errRead != nil { + recordAPIResponseError(ctx, e.cfg, errRead) + if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) { + err = errRead + return nil, err + } + if errCtx := ctx.Err(); errCtx != nil { + err = errCtx + return nil, err + } + lastStatus = 0 + lastBody = nil + lastErr = errRead + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + err = errRead + return nil, err + } + appendAPIResponseChunk(ctx, e.cfg, bodyBytes) + lastStatus = httpResp.StatusCode + lastBody = append([]byte(nil), bodyBytes...) + lastErr = nil + if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + if antigravityShouldRetryNoCapacity(httpResp.StatusCode, bodyBytes) { + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: no capacity on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + if attempt+1 < attempts { + delay := antigravityNoCapacityRetryDelay(attempt) + log.Debugf("antigravity executor: no capacity for model %s, retrying in %s (attempt %d/%d)", baseModel, delay, attempt+1, attempts) + if errWait := antigravityWait(ctx, delay); errWait != nil { + return nil, errWait + } + continue attemptLoop + } + } + sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} + if httpResp.StatusCode == http.StatusTooManyRequests { + if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil { + sErr.retryAfter = retryAfter + } + } + err = sErr + return nil, err } - sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} - if httpResp.StatusCode == http.StatusTooManyRequests { - if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil { + + out := make(chan cliproxyexecutor.StreamChunk) + stream = out + go func(resp *http.Response) { + defer close(out) + defer func() { + if errClose := resp.Body.Close(); errClose != nil { + log.Errorf("antigravity executor: close response body error: %v", errClose) + } + }() + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(nil, streamScannerBuffer) + var param any + for scanner.Scan() { + line := scanner.Bytes() + appendAPIResponseChunk(ctx, e.cfg, line) + + // Filter usage metadata for all models + // Only retain usage statistics in the terminal chunk + line = FilterSSEUsageMetadata(line) + + payload := jsonPayload(line) + if payload == nil { + continue + } + + if detail, ok := parseAntigravityStreamUsage(payload); ok { + reporter.publish(ctx, detail) + } + + chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(payload), ¶m) + for i := range chunks { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} + } + } + tail := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, []byte("[DONE]"), ¶m) + for i := range tail { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(tail[i])} + } + if errScan := scanner.Err(); errScan != nil { + recordAPIResponseError(ctx, e.cfg, errScan) + reporter.publishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{Err: errScan} + } else { + reporter.ensurePublished(ctx) + } + }(httpResp) + return stream, nil + } + + switch { + case lastStatus != 0: + sErr := statusErr{code: lastStatus, msg: string(lastBody)} + if lastStatus == http.StatusTooManyRequests { + if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil { sErr.retryAfter = retryAfter } } err = sErr - return nil, err + case lastErr != nil: + err = lastErr + default: + err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"} } - - out := make(chan cliproxyexecutor.StreamChunk) - stream = out - go func(resp *http.Response) { - defer close(out) - defer func() { - if errClose := resp.Body.Close(); errClose != nil { - log.Errorf("antigravity executor: close response body error: %v", errClose) - } - }() - scanner := bufio.NewScanner(resp.Body) - scanner.Buffer(nil, streamScannerBuffer) - var param any - for scanner.Scan() { - line := scanner.Bytes() - appendAPIResponseChunk(ctx, e.cfg, line) - - // Filter usage metadata for all models - // Only retain usage statistics in the terminal chunk - line = FilterSSEUsageMetadata(line) - - payload := jsonPayload(line) - if payload == nil { - continue - } - - if detail, ok := parseAntigravityStreamUsage(payload); ok { - reporter.publish(ctx, detail) - } - - chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(payload), ¶m) - for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} - } - } - tail := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, []byte("[DONE]"), ¶m) - for i := range tail { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(tail[i])} - } - if errScan := scanner.Err(); errScan != nil { - recordAPIResponseError(ctx, e.cfg, errScan) - reporter.publishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} - } else { - reporter.ensurePublished(ctx) - } - }(httpResp) - return stream, nil + return nil, err } - switch { - case lastStatus != 0: - sErr := statusErr{code: lastStatus, msg: string(lastBody)} - if lastStatus == http.StatusTooManyRequests { - if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil { - sErr.retryAfter = retryAfter - } - } - err = sErr - case lastErr != nil: - err = lastErr - default: - err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"} - } return nil, err } @@ -1384,14 +1447,65 @@ func resolveUserAgent(auth *cliproxyauth.Auth) string { return defaultAntigravityAgent } +func antigravityRetryAttempts(cfg *config.Config) int { + if cfg == nil { + return 1 + } + retry := cfg.RequestRetry + if retry < 0 { + retry = 0 + } + attempts := retry + 1 + if attempts < 1 { + return 1 + } + return attempts +} + +func antigravityShouldRetryNoCapacity(statusCode int, body []byte) bool { + if statusCode != http.StatusServiceUnavailable { + return false + } + if len(body) == 0 { + return false + } + msg := strings.ToLower(string(body)) + return strings.Contains(msg, "no capacity available") +} + +func antigravityNoCapacityRetryDelay(attempt int) time.Duration { + if attempt < 0 { + attempt = 0 + } + delay := time.Duration(attempt+1) * 250 * time.Millisecond + if delay > 2*time.Second { + delay = 2 * time.Second + } + return delay +} + +func antigravityWait(ctx context.Context, wait time.Duration) error { + if wait <= 0 { + return nil + } + timer := time.NewTimer(wait) + defer timer.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + func antigravityBaseURLFallbackOrder(auth *cliproxyauth.Auth) []string { if base := resolveCustomAntigravityBaseURL(auth); base != "" { return []string{base} } return []string{ - antigravitySandboxBaseURLDaily, antigravityBaseURLDaily, - antigravityBaseURLProd, + antigravitySandboxBaseURLDaily, + // antigravityBaseURLProd, } } diff --git a/internal/runtime/executor/claude_executor.go b/internal/runtime/executor/claude_executor.go index 9c291328..170ebb90 100644 --- a/internal/runtime/executor/claude_executor.go +++ b/internal/runtime/executor/claude_executor.go @@ -163,7 +163,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("response body close error: %v", errClose) @@ -295,7 +295,7 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("response body close error: %v", errClose) } diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index 4be560bf..1f368b84 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -150,7 +150,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} return resp, err } @@ -265,7 +265,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au return nil, readErr } appendAPIResponseChunk(ctx, e.cfg, data) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) err = statusErr{code: httpResp.StatusCode, msg: string(data)} return nil, err } diff --git a/internal/runtime/executor/gemini_cli_executor.go b/internal/runtime/executor/gemini_cli_executor.go index 82d1aa84..e8a244ab 100644 --- a/internal/runtime/executor/gemini_cli_executor.go +++ b/internal/runtime/executor/gemini_cli_executor.go @@ -227,7 +227,7 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth lastStatus = httpResp.StatusCode lastBody = append([]byte(nil), data...) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) if httpResp.StatusCode == 429 { if idx+1 < len(models) { log.Debugf("gemini cli executor: rate limited, retrying with next model: %s", models[idx+1]) @@ -360,7 +360,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut appendAPIResponseChunk(ctx, e.cfg, data) lastStatus = httpResp.StatusCode lastBody = append([]byte(nil), data...) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) if httpResp.StatusCode == 429 { if idx+1 < len(models) { log.Debugf("gemini cli executor: rate limited, retrying with next model: %s", models[idx+1]) diff --git a/internal/runtime/executor/gemini_executor.go b/internal/runtime/executor/gemini_executor.go index 30f9fd87..58bd71a2 100644 --- a/internal/runtime/executor/gemini_executor.go +++ b/internal/runtime/executor/gemini_executor.go @@ -188,7 +188,7 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} return resp, err } @@ -282,7 +282,7 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("gemini executor: close response body error: %v", errClose) } @@ -402,7 +402,7 @@ func (e *GeminiExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Aut } appendAPIResponseChunk(ctx, e.cfg, data) if resp.StatusCode < 200 || resp.StatusCode >= 300 { - log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, summarizeErrorBody(resp.Header.Get("Content-Type"), data)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", resp.StatusCode, summarizeErrorBody(resp.Header.Get("Content-Type"), data)) return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(data)} } diff --git a/internal/runtime/executor/gemini_vertex_executor.go b/internal/runtime/executor/gemini_vertex_executor.go index 6a46d1f6..ceea42ff 100644 --- a/internal/runtime/executor/gemini_vertex_executor.go +++ b/internal/runtime/executor/gemini_vertex_executor.go @@ -389,7 +389,7 @@ func (e *GeminiVertexExecutor) executeWithServiceAccount(ctx context.Context, au if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} return resp, err } @@ -503,7 +503,7 @@ func (e *GeminiVertexExecutor) executeWithAPIKey(ctx context.Context, auth *clip if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} return resp, err } @@ -601,7 +601,7 @@ func (e *GeminiVertexExecutor) executeStreamWithServiceAccount(ctx context.Conte if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("vertex executor: close response body error: %v", errClose) } @@ -725,7 +725,7 @@ func (e *GeminiVertexExecutor) executeStreamWithAPIKey(ctx context.Context, auth if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("vertex executor: close response body error: %v", errClose) } @@ -838,7 +838,7 @@ func (e *GeminiVertexExecutor) countTokensWithServiceAccount(ctx context.Context if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) return cliproxyexecutor.Response{}, statusErr{code: httpResp.StatusCode, msg: string(b)} } data, errRead := io.ReadAll(httpResp.Body) @@ -922,7 +922,7 @@ func (e *GeminiVertexExecutor) countTokensWithAPIKey(ctx context.Context, auth * if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) return cliproxyexecutor.Response{}, statusErr{code: httpResp.StatusCode, msg: string(b)} } data, errRead := io.ReadAll(httpResp.Body) diff --git a/internal/runtime/executor/iflow_executor.go b/internal/runtime/executor/iflow_executor.go index 651fca2f..270f5aa4 100644 --- a/internal/runtime/executor/iflow_executor.go +++ b/internal/runtime/executor/iflow_executor.go @@ -142,7 +142,7 @@ func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("iflow request error: status %d body %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} return resp, err } @@ -244,7 +244,7 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au log.Errorf("iflow executor: close response body error: %v", errClose) } appendAPIResponseChunk(ctx, e.cfg, data) - log.Debugf("iflow streaming error: status %d body %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) + logWithRequestID(ctx).Debugf("request error, error status: %d error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) err = statusErr{code: httpResp.StatusCode, msg: string(data)} return nil, err } diff --git a/internal/runtime/executor/logging_helpers.go b/internal/runtime/executor/logging_helpers.go index 90532772..e9876243 100644 --- a/internal/runtime/executor/logging_helpers.go +++ b/internal/runtime/executor/logging_helpers.go @@ -12,7 +12,10 @@ import ( "github.com/gin-gonic/gin" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" + "github.com/router-for-me/CLIProxyAPI/v6/internal/logging" "github.com/router-for-me/CLIProxyAPI/v6/internal/util" + log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" ) const ( @@ -332,6 +335,12 @@ func summarizeErrorBody(contentType string, body []byte) string { } return "[html body omitted]" } + + // Try to extract error message from JSON response + if message := extractJSONErrorMessage(body); message != "" { + return message + } + return string(body) } @@ -358,3 +367,25 @@ func extractHTMLTitle(body []byte) string { } return strings.Join(strings.Fields(title), " ") } + +// extractJSONErrorMessage attempts to extract error.message from JSON error responses +func extractJSONErrorMessage(body []byte) string { + result := gjson.GetBytes(body, "error.message") + if result.Exists() && result.String() != "" { + return result.String() + } + return "" +} + +// logWithRequestID returns a logrus Entry with request_id field populated from context. +// If no request ID is found in context, it returns the standard logger. +func logWithRequestID(ctx context.Context) *log.Entry { + if ctx == nil { + return log.NewEntry(log.StandardLogger()) + } + requestID := logging.GetRequestID(ctx) + if requestID == "" { + return log.NewEntry(log.StandardLogger()) + } + return log.WithField("request_id", requestID) +} diff --git a/internal/runtime/executor/openai_compat_executor.go b/internal/runtime/executor/openai_compat_executor.go index 480137b6..85df21b1 100644 --- a/internal/runtime/executor/openai_compat_executor.go +++ b/internal/runtime/executor/openai_compat_executor.go @@ -146,7 +146,7 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} return resp, err } @@ -239,7 +239,7 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("openai compat executor: close response body error: %v", errClose) } diff --git a/internal/runtime/executor/qwen_executor.go b/internal/runtime/executor/qwen_executor.go index 6d6dac78..d05579d4 100644 --- a/internal/runtime/executor/qwen_executor.go +++ b/internal/runtime/executor/qwen_executor.go @@ -133,7 +133,7 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} return resp, err } @@ -222,7 +222,7 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("qwen executor: close response body error: %v", errClose) }