Files
CLIProxyAPI/internal/runtime/executor/helps/logging_helpers.go
Luis Pater 5753d1a089 feat(logging): enable file-backed request/response sources for enhanced API logging
- Introduced support for file-backed logging of API requests and responses to handle large payloads efficiently.
- Refactored `attachWebsocketLogSources` to `attachRequestLogSources` for broader request and response handling.
- Added new methods for appending request/response data to file-backed sources and updated existing logging workflows for compatibility.
- Improved cleanup and merge logic for file-backed sources during request processing.
- Updated tests to cover newly introduced file-backed logging functionality.
2026-06-05 01:48:05 +08:00

711 lines
20 KiB
Go

package helps
import (
"bytes"
"context"
"fmt"
"html"
"net/http"
"net/url"
"sort"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/router-for-me/CLIProxyAPI/v7/internal/config"
"github.com/router-for-me/CLIProxyAPI/v7/internal/logging"
"github.com/router-for-me/CLIProxyAPI/v7/internal/util"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
)
const (
apiAttemptsKey = "API_UPSTREAM_ATTEMPTS"
apiRequestKey = "API_REQUEST"
apiResponseKey = "API_RESPONSE"
apiWebsocketTimelineKey = "API_WEBSOCKET_TIMELINE"
creditsUsedKey = "__antigravity_credits_used__"
)
// UpstreamRequestLog captures the outbound upstream request details for logging.
type UpstreamRequestLog struct {
URL string
Method string
Headers http.Header
Body []byte
Provider string
AuthID string
AuthLabel string
AuthType string
AuthValue string
}
type upstreamAttempt struct {
index int
request string
response *strings.Builder
responseSource *logging.FileBodySource
responseIntroWritten bool
statusWritten bool
headersWritten bool
bodyStarted bool
bodyHasContent bool
prevWasSSEEvent bool
errorWritten bool
}
func requestLogCaptureEnabled(cfg *config.Config) bool {
return cfg != nil && cfg.RequestLog && !cfg.CommercialMode
}
// RecordAPIRequest stores the upstream request metadata in Gin context for request logging.
func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequestLog) {
if !requestLogCaptureEnabled(cfg) {
return
}
ginCtx := ginContextFrom(ctx)
if ginCtx == nil {
return
}
attempts := getAttempts(ginCtx)
index := len(attempts) + 1
builder := &strings.Builder{}
builder.WriteString(fmt.Sprintf("=== API REQUEST %d ===\n", index))
builder.WriteString(fmt.Sprintf("Timestamp: %s\n", time.Now().Format(time.RFC3339Nano)))
if info.URL != "" {
builder.WriteString(fmt.Sprintf("Upstream URL: %s\n", info.URL))
} else {
builder.WriteString("Upstream URL: <unknown>\n")
}
if info.Method != "" {
builder.WriteString(fmt.Sprintf("HTTP Method: %s\n", info.Method))
}
if auth := formatAuthInfo(info); auth != "" {
builder.WriteString(fmt.Sprintf("Auth: %s\n", auth))
}
builder.WriteString("\nHeaders:\n")
writeHeaders(builder, info.Headers)
builder.WriteString("\nBody:\n")
requestText := ""
if source, ok := apiRequestSource(ginCtx); ok {
if errWrite := source.AppendBytes([]byte(builder.String())); errWrite == nil {
if len(info.Body) > 0 {
if errBody := source.AppendBytes(info.Body); errBody != nil {
log.WithError(errBody).Warn("failed to append api request body log part")
}
} else if errEmpty := source.AppendBytes([]byte("<empty>")); errEmpty != nil {
log.WithError(errEmpty).Warn("failed to append empty api request log part")
}
if errEnd := source.AppendBytes([]byte("\n\n")); errEnd != nil {
log.WithError(errEnd).Warn("failed to append api request log terminator")
}
} else {
log.WithError(errWrite).Warn("failed to append api request log part")
if len(info.Body) > 0 {
builder.WriteString(string(info.Body))
} else {
builder.WriteString("<empty>")
}
builder.WriteString("\n\n")
requestText = builder.String()
}
} else {
if len(info.Body) > 0 {
builder.WriteString(string(info.Body))
} else {
builder.WriteString("<empty>")
}
builder.WriteString("\n\n")
requestText = builder.String()
}
attempt := &upstreamAttempt{
index: index,
request: requestText,
response: &strings.Builder{},
responseSource: apiResponseSourceOrNil(ginCtx),
}
attempts = append(attempts, attempt)
ginCtx.Set(apiAttemptsKey, attempts)
if requestText != "" {
updateAggregatedRequest(ginCtx, attempts)
}
}
// RecordAPIResponseMetadata captures upstream response status/header information for the latest attempt.
func RecordAPIResponseMetadata(ctx context.Context, cfg *config.Config, status int, headers http.Header) {
logging.SetResponseHeaders(ctx, headers)
if !requestLogCaptureEnabled(cfg) {
return
}
ginCtx := ginContextFrom(ctx)
if ginCtx == nil {
return
}
attempts, attempt := ensureAttempt(ginCtx)
ensureResponseIntro(ginCtx, attempt)
if status > 0 && !attempt.statusWritten {
writeAttemptResponse(ginCtx, attempt, []byte(fmt.Sprintf("Status: %d\n", status)))
attempt.statusWritten = true
}
if !attempt.headersWritten {
builder := &strings.Builder{}
builder.WriteString("Headers:\n")
writeHeaders(builder, headers)
writeAttemptResponse(ginCtx, attempt, []byte(builder.String()))
attempt.headersWritten = true
writeAttemptResponse(ginCtx, attempt, []byte("\n"))
}
updateAggregatedResponseIfMemoryBacked(ginCtx, attempts)
}
// RecordAPIResponseError adds an error entry for the latest attempt when no HTTP response is available.
func RecordAPIResponseError(ctx context.Context, cfg *config.Config, err error) {
if !requestLogCaptureEnabled(cfg) || err == nil {
return
}
ginCtx := ginContextFrom(ctx)
if ginCtx == nil {
return
}
attempts, attempt := ensureAttempt(ginCtx)
ensureResponseIntro(ginCtx, attempt)
if attempt.bodyStarted && !attempt.bodyHasContent {
// Ensure body does not stay empty marker if error arrives first.
attempt.bodyStarted = false
}
if attempt.errorWritten {
writeAttemptResponse(ginCtx, attempt, []byte("\n"))
}
writeAttemptResponse(ginCtx, attempt, []byte(fmt.Sprintf("Error: %s\n", err.Error())))
attempt.errorWritten = true
updateAggregatedResponseIfMemoryBacked(ginCtx, attempts)
}
// AppendAPIResponseChunk appends an upstream response chunk to Gin context for request logging.
func AppendAPIResponseChunk(ctx context.Context, cfg *config.Config, chunk []byte) {
if !requestLogCaptureEnabled(cfg) {
return
}
data := bytes.TrimSpace(chunk)
if len(data) == 0 {
return
}
ginCtx := ginContextFrom(ctx)
if ginCtx == nil {
return
}
attempts, attempt := ensureAttempt(ginCtx)
ensureResponseIntro(ginCtx, attempt)
if !attempt.headersWritten {
builder := &strings.Builder{}
builder.WriteString("Headers:\n")
writeHeaders(builder, nil)
writeAttemptResponse(ginCtx, attempt, []byte(builder.String()))
attempt.headersWritten = true
writeAttemptResponse(ginCtx, attempt, []byte("\n"))
}
if !attempt.bodyStarted {
writeAttemptResponse(ginCtx, attempt, []byte("Body:\n"))
attempt.bodyStarted = true
}
currentChunkIsSSEEvent := bytes.HasPrefix(data, []byte("event:"))
currentChunkIsSSEData := bytes.HasPrefix(data, []byte("data:"))
if attempt.bodyHasContent {
separator := "\n\n"
if attempt.prevWasSSEEvent && currentChunkIsSSEData {
separator = "\n"
}
writeAttemptResponse(ginCtx, attempt, []byte(separator))
}
writeAttemptResponse(ginCtx, attempt, data)
attempt.bodyHasContent = true
attempt.prevWasSSEEvent = currentChunkIsSSEEvent
updateAggregatedResponseIfMemoryBacked(ginCtx, attempts)
}
// RecordAPIWebsocketRequest stores an upstream websocket request event in Gin context.
func RecordAPIWebsocketRequest(ctx context.Context, cfg *config.Config, info UpstreamRequestLog) {
if !requestLogCaptureEnabled(cfg) {
return
}
ginCtx := ginContextFrom(ctx)
if ginCtx == nil {
return
}
builder := &strings.Builder{}
builder.WriteString(fmt.Sprintf("Timestamp: %s\n", time.Now().Format(time.RFC3339Nano)))
builder.WriteString("Event: api.websocket.request\n")
if info.URL != "" {
builder.WriteString(fmt.Sprintf("Upstream URL: %s\n", info.URL))
}
if auth := formatAuthInfo(info); auth != "" {
builder.WriteString(fmt.Sprintf("Auth: %s\n", auth))
}
builder.WriteString("Headers:\n")
writeHeaders(builder, info.Headers)
builder.WriteString("\nBody:\n")
if len(info.Body) > 0 {
builder.Write(info.Body)
} else {
builder.WriteString("<empty>")
}
builder.WriteString("\n")
appendAPIWebsocketTimeline(ginCtx, []byte(builder.String()))
}
// RecordAPIWebsocketHandshake stores the upstream websocket handshake response metadata.
func RecordAPIWebsocketHandshake(ctx context.Context, cfg *config.Config, status int, headers http.Header) {
logging.SetResponseHeaders(ctx, headers)
if !requestLogCaptureEnabled(cfg) {
return
}
ginCtx := ginContextFrom(ctx)
if ginCtx == nil {
return
}
builder := &strings.Builder{}
builder.WriteString(fmt.Sprintf("Timestamp: %s\n", time.Now().Format(time.RFC3339Nano)))
builder.WriteString("Event: api.websocket.handshake\n")
if status > 0 {
builder.WriteString(fmt.Sprintf("Status: %d\n", status))
}
builder.WriteString("Headers:\n")
writeHeaders(builder, headers)
builder.WriteString("\n")
appendAPIWebsocketTimeline(ginCtx, []byte(builder.String()))
}
// RecordAPIWebsocketUpgradeRejection stores a rejected websocket upgrade as an HTTP attempt.
func RecordAPIWebsocketUpgradeRejection(ctx context.Context, cfg *config.Config, info UpstreamRequestLog, status int, headers http.Header, body []byte) {
logging.SetResponseHeaders(ctx, headers)
if !requestLogCaptureEnabled(cfg) {
return
}
ginCtx := ginContextFrom(ctx)
if ginCtx == nil {
return
}
RecordAPIRequest(ctx, cfg, info)
RecordAPIResponseMetadata(ctx, cfg, status, headers)
AppendAPIResponseChunk(ctx, cfg, body)
}
// WebsocketUpgradeRequestURL converts a websocket URL back to its HTTP handshake URL for logging.
func WebsocketUpgradeRequestURL(rawURL string) string {
trimmedURL := strings.TrimSpace(rawURL)
if trimmedURL == "" {
return ""
}
parsed, err := url.Parse(trimmedURL)
if err != nil {
return trimmedURL
}
switch strings.ToLower(parsed.Scheme) {
case "ws":
parsed.Scheme = "http"
case "wss":
parsed.Scheme = "https"
}
return parsed.String()
}
// AppendAPIWebsocketResponse stores an upstream websocket response frame in Gin context.
func AppendAPIWebsocketResponse(ctx context.Context, cfg *config.Config, payload []byte) {
if !requestLogCaptureEnabled(cfg) {
return
}
data := bytes.TrimSpace(payload)
if len(data) == 0 {
return
}
ginCtx := ginContextFrom(ctx)
if ginCtx == nil {
return
}
markAPIResponseTimestamp(ginCtx)
builder := &strings.Builder{}
builder.WriteString(fmt.Sprintf("Timestamp: %s\n", time.Now().Format(time.RFC3339Nano)))
builder.WriteString("Event: api.websocket.response\n")
builder.Write(data)
builder.WriteString("\n")
appendAPIWebsocketTimeline(ginCtx, []byte(builder.String()))
}
// RecordAPIWebsocketError stores an upstream websocket error event in Gin context.
func RecordAPIWebsocketError(ctx context.Context, cfg *config.Config, stage string, err error) {
if !requestLogCaptureEnabled(cfg) || err == nil {
return
}
ginCtx := ginContextFrom(ctx)
if ginCtx == nil {
return
}
markAPIResponseTimestamp(ginCtx)
builder := &strings.Builder{}
builder.WriteString(fmt.Sprintf("Timestamp: %s\n", time.Now().Format(time.RFC3339Nano)))
builder.WriteString("Event: api.websocket.error\n")
if trimmed := strings.TrimSpace(stage); trimmed != "" {
builder.WriteString(fmt.Sprintf("Stage: %s\n", trimmed))
}
builder.WriteString(fmt.Sprintf("Error: %s\n", err.Error()))
appendAPIWebsocketTimeline(ginCtx, []byte(builder.String()))
}
func ginContextFrom(ctx context.Context) *gin.Context {
ginCtx, _ := ctx.Value("gin").(*gin.Context)
return ginCtx
}
func getAttempts(ginCtx *gin.Context) []*upstreamAttempt {
if ginCtx == nil {
return nil
}
if value, exists := ginCtx.Get(apiAttemptsKey); exists {
if attempts, ok := value.([]*upstreamAttempt); ok {
return attempts
}
}
return nil
}
func ensureAttempt(ginCtx *gin.Context) ([]*upstreamAttempt, *upstreamAttempt) {
attempts := getAttempts(ginCtx)
if len(attempts) == 0 {
attempt := &upstreamAttempt{
index: 1,
response: &strings.Builder{},
responseSource: apiResponseSourceOrNil(ginCtx),
}
if source, ok := apiRequestSource(ginCtx); ok {
if errWrite := source.AppendBytes([]byte("=== API REQUEST 1 ===\n<missing>\n\n")); errWrite != nil {
log.WithError(errWrite).Warn("failed to append missing api request log part")
attempt.request = "=== API REQUEST 1 ===\n<missing>\n\n"
}
} else {
attempt.request = "=== API REQUEST 1 ===\n<missing>\n\n"
}
attempts = []*upstreamAttempt{attempt}
ginCtx.Set(apiAttemptsKey, attempts)
if attempt.request != "" {
updateAggregatedRequest(ginCtx, attempts)
}
}
return attempts, attempts[len(attempts)-1]
}
func ensureResponseIntro(ginCtx *gin.Context, attempt *upstreamAttempt) {
if attempt == nil || attempt.response == nil || attempt.responseIntroWritten {
return
}
writeAttemptResponse(ginCtx, attempt, []byte(fmt.Sprintf("=== API RESPONSE %d ===\n", attempt.index)))
writeAttemptResponse(ginCtx, attempt, []byte(fmt.Sprintf("Timestamp: %s\n", time.Now().Format(time.RFC3339Nano))))
writeAttemptResponse(ginCtx, attempt, []byte("\n"))
attempt.responseIntroWritten = true
}
func writeAttemptResponse(ginCtx *gin.Context, attempt *upstreamAttempt, payload []byte) {
if attempt == nil || len(payload) == 0 {
return
}
if attempt.responseSource == nil {
attempt.responseSource = apiResponseSourceOrNil(ginCtx)
}
if attempt.responseSource != nil {
if errWrite := attempt.responseSource.AppendBytes(payload); errWrite == nil {
if ginCtx != nil {
ginCtx.Set(logging.APIResponseCapturedContextKey, true)
}
return
} else {
log.WithError(errWrite).Warn("failed to append api response log part")
attempt.responseSource = nil
}
}
if attempt.response == nil {
attempt.response = &strings.Builder{}
}
attempt.response.Write(payload)
}
func updateAggregatedRequest(ginCtx *gin.Context, attempts []*upstreamAttempt) {
if ginCtx == nil {
return
}
var builder strings.Builder
for _, attempt := range attempts {
builder.WriteString(attempt.request)
}
ginCtx.Set(apiRequestKey, []byte(builder.String()))
}
func updateAggregatedResponseIfMemoryBacked(ginCtx *gin.Context, attempts []*upstreamAttempt) {
if apiResponseSourceOrNil(ginCtx) != nil {
return
}
updateAggregatedResponse(ginCtx, attempts)
}
func updateAggregatedResponse(ginCtx *gin.Context, attempts []*upstreamAttempt) {
if ginCtx == nil {
return
}
var builder strings.Builder
for idx, attempt := range attempts {
if attempt == nil || attempt.response == nil {
continue
}
responseText := attempt.response.String()
if responseText == "" {
continue
}
builder.WriteString(responseText)
if !strings.HasSuffix(responseText, "\n") {
builder.WriteString("\n")
}
if idx < len(attempts)-1 {
builder.WriteString("\n")
}
}
ginCtx.Set(apiResponseKey, []byte(builder.String()))
}
func apiRequestSource(ginCtx *gin.Context) (*logging.FileBodySource, bool) {
return fileBodySourceFromGin(ginCtx, logging.APIRequestSourceContextKey)
}
func apiResponseSourceOrNil(ginCtx *gin.Context) *logging.FileBodySource {
source, ok := fileBodySourceFromGin(ginCtx, logging.APIResponseSourceContextKey)
if !ok {
return nil
}
return source
}
func appendAPIWebsocketTimeline(ginCtx *gin.Context, chunk []byte) {
if ginCtx == nil {
return
}
data := bytes.TrimSpace(chunk)
if len(data) == 0 {
return
}
if source, ok := apiWebsocketTimelineSource(ginCtx); ok {
if errAppend := source.AppendPart(data); errAppend == nil {
return
} else {
log.WithError(errAppend).Warn("failed to append api websocket timeline log part")
}
}
if existing, exists := ginCtx.Get(apiWebsocketTimelineKey); exists {
if existingBytes, ok := existing.([]byte); ok && len(existingBytes) > 0 {
combined := make([]byte, 0, len(existingBytes)+len(data)+2)
combined = append(combined, existingBytes...)
if !bytes.HasSuffix(existingBytes, []byte("\n")) {
combined = append(combined, '\n')
}
combined = append(combined, '\n')
combined = append(combined, data...)
ginCtx.Set(apiWebsocketTimelineKey, combined)
return
}
}
ginCtx.Set(apiWebsocketTimelineKey, bytes.Clone(data))
}
func apiWebsocketTimelineSource(ginCtx *gin.Context) (*logging.FileBodySource, bool) {
return fileBodySourceFromGin(ginCtx, logging.APIWebsocketTimelineSourceContextKey)
}
func fileBodySourceFromGin(ginCtx *gin.Context, key string) (*logging.FileBodySource, bool) {
if ginCtx == nil {
return nil, false
}
value, exists := ginCtx.Get(key)
if !exists {
return nil, false
}
source, ok := value.(*logging.FileBodySource)
return source, ok && source != nil
}
func markAPIResponseTimestamp(ginCtx *gin.Context) {
if ginCtx == nil {
return
}
if _, exists := ginCtx.Get("API_RESPONSE_TIMESTAMP"); exists {
return
}
ginCtx.Set("API_RESPONSE_TIMESTAMP", time.Now())
}
func writeHeaders(builder *strings.Builder, headers http.Header) {
if builder == nil {
return
}
if len(headers) == 0 {
builder.WriteString("<none>\n")
return
}
keys := make([]string, 0, len(headers))
for key := range headers {
keys = append(keys, key)
}
sort.Strings(keys)
for _, key := range keys {
values := headers[key]
if len(values) == 0 {
builder.WriteString(fmt.Sprintf("%s:\n", key))
continue
}
for _, value := range values {
masked := util.MaskSensitiveHeaderValue(key, value)
builder.WriteString(fmt.Sprintf("%s: %s\n", key, masked))
}
}
}
func formatAuthInfo(info UpstreamRequestLog) string {
var parts []string
if trimmed := strings.TrimSpace(info.Provider); trimmed != "" {
parts = append(parts, fmt.Sprintf("provider=%s", trimmed))
}
if trimmed := strings.TrimSpace(info.AuthID); trimmed != "" {
parts = append(parts, fmt.Sprintf("auth_id=%s", trimmed))
}
if trimmed := strings.TrimSpace(info.AuthLabel); trimmed != "" {
parts = append(parts, fmt.Sprintf("label=%s", trimmed))
}
authType := strings.ToLower(strings.TrimSpace(info.AuthType))
authValue := strings.TrimSpace(info.AuthValue)
switch authType {
case "api_key":
if authValue != "" {
parts = append(parts, fmt.Sprintf("type=api_key value=%s", util.HideAPIKey(authValue)))
} else {
parts = append(parts, "type=api_key")
}
case "oauth":
parts = append(parts, "type=oauth")
default:
if authType != "" {
if authValue != "" {
parts = append(parts, fmt.Sprintf("type=%s value=%s", authType, authValue))
} else {
parts = append(parts, fmt.Sprintf("type=%s", authType))
}
}
}
return strings.Join(parts, ", ")
}
func SummarizeErrorBody(contentType string, body []byte) string {
isHTML := strings.Contains(strings.ToLower(contentType), "text/html")
if !isHTML {
trimmed := bytes.TrimSpace(bytes.ToLower(body))
if bytes.HasPrefix(trimmed, []byte("<!doctype html")) || bytes.HasPrefix(trimmed, []byte("<html")) {
isHTML = true
}
}
if isHTML {
if title := extractHTMLTitle(body); title != "" {
return title
}
return "[html body omitted]"
}
// Try to extract error message from JSON response
if message := extractJSONErrorMessage(body); message != "" {
return message
}
return string(body)
}
func extractHTMLTitle(body []byte) string {
lower := bytes.ToLower(body)
start := bytes.Index(lower, []byte("<title"))
if start == -1 {
return ""
}
gt := bytes.IndexByte(lower[start:], '>')
if gt == -1 {
return ""
}
start += gt + 1
end := bytes.Index(lower[start:], []byte("</title>"))
if end == -1 {
return ""
}
title := string(body[start : start+end])
title = html.UnescapeString(title)
title = strings.TrimSpace(title)
if title == "" {
return ""
}
return strings.Join(strings.Fields(title), " ")
}
// extractJSONErrorMessage attempts to extract error.message from JSON error responses
func extractJSONErrorMessage(body []byte) string {
result := gjson.GetBytes(body, "error.message")
if result.Exists() && result.String() != "" {
return result.String()
}
return ""
}
// logWithRequestID returns a logrus Entry with request_id field populated from context.
// If no request ID is found in context, it returns the standard logger.
func LogWithRequestID(ctx context.Context) *log.Entry {
if ctx == nil {
return log.NewEntry(log.StandardLogger())
}
requestID := logging.GetRequestID(ctx)
if requestID == "" {
return log.NewEntry(log.StandardLogger())
}
return log.WithField("request_id", requestID)
}
// MarkCreditsUsed flags the request as having used AI credits for billing.
func MarkCreditsUsed(ctx context.Context) {
ginCtx := ginContextFrom(ctx)
if ginCtx != nil {
ginCtx.Set(creditsUsedKey, true)
}
}
// CreditsUsed returns true if the request used AI credits.
func CreditsUsed(ctx context.Context) bool {
ginCtx := ginContextFrom(ctx)
if ginCtx != nil {
if val, exists := ginCtx.Get(creditsUsedKey); exists {
if b, ok := val.(bool); ok {
return b
}
}
}
return false
}