package executor import ( "bufio" "bytes" "context" "encoding/json" "fmt" "io" "net/http" "net/url" "sort" "strings" "time" xaiauth "github.com/router-for-me/CLIProxyAPI/v7/internal/auth/xai" "github.com/router-for-me/CLIProxyAPI/v7/internal/config" "github.com/router-for-me/CLIProxyAPI/v7/internal/runtime/executor/helps" "github.com/router-for-me/CLIProxyAPI/v7/internal/thinking" "github.com/router-for-me/CLIProxyAPI/v7/internal/util" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth" cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/executor" sdktranslator "github.com/router-for-me/CLIProxyAPI/v7/sdk/translator" log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "github.com/tidwall/sjson" "github.com/tiktoken-go/tokenizer" ) var xaiDataTag = []byte("data:") const ( xaiImageHandlerType = "openai-image" xaiVideoHandlerType = "openai-video" xaiCustomToolType = "custom" xaiFunctionToolType = "function" xaiImageGenerationToolType = "image_generation" xaiNamespaceToolType = "namespace" xaiToolSearchType = "tool_search" xaiWebSearchToolType = "web_search" xaiImagesGenerationsPath = "/images/generations" xaiImagesEditsPath = "/images/edits" xaiDefaultImageEndpointPath = xaiImagesGenerationsPath xaiVideosGenerationsPath = "/videos/generations" xaiVideosEditsPath = "/videos/edits" xaiVideosExtensionsPath = "/videos/extensions" xaiVideosPath = "/videos" xaiIdempotencyKeyMetaKey = "idempotency_key" ) // XAIExecutor is a stateless executor for xAI Grok's Responses API. type XAIExecutor struct { cfg *config.Config } // NewXAIExecutor creates a new xAI executor. func NewXAIExecutor(cfg *config.Config) *XAIExecutor { return &XAIExecutor{cfg: cfg} } // Identifier returns the provider identifier. func (e *XAIExecutor) Identifier() string { return "xai" } // PrepareRequest injects xAI credentials into the outgoing HTTP request. func (e *XAIExecutor) PrepareRequest(req *http.Request, auth *cliproxyauth.Auth) error { if req == nil { return nil } token, _ := xaiCreds(auth) if strings.TrimSpace(token) != "" { req.Header.Set("Authorization", "Bearer "+token) } var attrs map[string]string if auth != nil { attrs = auth.Attributes } util.ApplyCustomHeadersFromAttrs(req, attrs) return nil } // HttpRequest injects xAI credentials into the request and executes it. func (e *XAIExecutor) HttpRequest(ctx context.Context, auth *cliproxyauth.Auth, req *http.Request) (*http.Response, error) { if req == nil { return nil, fmt.Errorf("xai executor: request is nil") } if ctx == nil { ctx = req.Context() } httpReq := req.WithContext(ctx) if errPrepare := e.PrepareRequest(httpReq, auth); errPrepare != nil { return nil, errPrepare } httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) return httpClient.Do(httpReq) } func (e *XAIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { if endpointPath := xaiImageEndpointPath(opts); endpointPath != "" { return e.executeImages(ctx, auth, req, endpointPath) } if xaiIsVideoRequest(opts) { return e.executeVideos(ctx, auth, req, opts) } token, baseURL := xaiCreds(auth) if baseURL == "" { baseURL = xaiauth.DefaultAPIBaseURL } prepared, err := e.prepareResponsesRequest(ctx, req, opts, true) if err != nil { return resp, err } reporter := helps.NewExecutorUsageReporter(ctx, e, prepared.baseModel, auth) defer reporter.TrackFailure(ctx, &err) reporter.SetTranslatedReasoningEffort(prepared.body, e.Identifier()) url := strings.TrimSuffix(baseURL, "/") + "/responses" httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(prepared.body)) if err != nil { return resp, err } applyXAIHeaders(httpReq, auth, token, true, prepared.sessionID) e.recordXAIRequest(ctx, auth, url, httpReq.Header.Clone(), prepared.body) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) return resp, err } defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("xai executor: close response body error: %v", errClose) } }() helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { data, errRead := io.ReadAll(httpResp.Body) if errRead != nil { helps.RecordAPIResponseError(ctx, e.cfg, errRead) return resp, errRead } helps.AppendAPIResponseChunk(ctx, e.cfg, data) helps.LogWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) return resp, statusErr{code: httpResp.StatusCode, msg: string(data)} } data, err := io.ReadAll(httpResp.Body) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) return resp, err } helps.AppendAPIResponseChunk(ctx, e.cfg, data) outputItemsByIndex := make(map[int64][]byte) var outputItemsFallback [][]byte for _, line := range bytes.Split(data, []byte("\n")) { if !bytes.HasPrefix(line, xaiDataTag) { continue } eventData := bytes.TrimSpace(line[len(xaiDataTag):]) switch gjson.GetBytes(eventData, "type").String() { case "response.output_item.done": xaiCollectOutputItemDone(eventData, outputItemsByIndex, &outputItemsFallback) case "response.completed": if detail, ok := helps.ParseCodexUsage(eventData); ok { reporter.Publish(ctx, detail) } completedData := xaiPatchCompletedOutput(eventData, outputItemsByIndex, outputItemsFallback) var param any out := sdktranslator.TranslateNonStream(ctx, prepared.to, prepared.from, req.Model, prepared.originalPayload, prepared.body, completedData, ¶m) return cliproxyexecutor.Response{Payload: out, Headers: httpResp.Header.Clone()}, nil } } return resp, statusErr{code: http.StatusRequestTimeout, msg: "xai stream error: stream disconnected before response.completed"} } func (e *XAIExecutor) executeImages(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, endpointPath string) (resp cliproxyexecutor.Response, err error) { token, baseURL := xaiCreds(auth) if baseURL == "" { baseURL = xaiauth.DefaultAPIBaseURL } if endpointPath == "" { endpointPath = xaiDefaultImageEndpointPath } url := strings.TrimSuffix(baseURL, "/") + endpointPath httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(req.Payload)) if err != nil { return resp, err } applyXAIHeaders(httpReq, auth, token, false, "") e.recordXAIRequest(ctx, auth, url, httpReq.Header.Clone(), req.Payload) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) return resp, err } defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("xai executor: close response body error: %v", errClose) } }() helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) data, err := io.ReadAll(httpResp.Body) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) return resp, err } helps.AppendAPIResponseChunk(ctx, e.cfg, data) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { helps.LogWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) return resp, statusErr{code: httpResp.StatusCode, msg: string(data)} } return cliproxyexecutor.Response{Payload: data, Headers: httpResp.Header.Clone()}, nil } func (e *XAIExecutor) executeVideos(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { token, baseURL := xaiCreds(auth) if baseURL == "" { baseURL = xaiauth.DefaultAPIBaseURL } method := http.MethodPost endpointPath := xaiVideosGenerationsPath var body io.Reader = bytes.NewReader(req.Payload) switch path := xaiVideoEndpointPath(opts); path { case xaiVideosGenerationsPath, xaiVideosEditsPath, xaiVideosExtensionsPath: endpointPath = path default: if requestID := strings.TrimSpace(gjson.GetBytes(req.Payload, "request_id").String()); requestID != "" { method = http.MethodGet endpointPath = xaiVideosPath + "/" + url.PathEscape(requestID) body = nil } } requestURL := strings.TrimSuffix(baseURL, "/") + endpointPath httpReq, err := http.NewRequestWithContext(ctx, method, requestURL, body) if err != nil { return resp, err } applyXAIHeaders(httpReq, auth, token, false, "") if method == http.MethodPost { key := xaiMetadataString(opts.Metadata, xaiIdempotencyKeyMetaKey) if key == "" && opts.Headers != nil { key = strings.TrimSpace(opts.Headers.Get("x-idempotency-key")) } if key != "" { httpReq.Header.Set("x-idempotency-key", key) } } e.recordXAIRequest(ctx, auth, requestURL, httpReq.Header.Clone(), req.Payload) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) return resp, err } defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("xai executor: close response body error: %v", errClose) } }() helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) data, err := io.ReadAll(httpResp.Body) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) return resp, err } helps.AppendAPIResponseChunk(ctx, e.cfg, data) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { helps.LogWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) return resp, statusErr{code: httpResp.StatusCode, msg: string(data)} } return cliproxyexecutor.Response{Payload: data, Headers: httpResp.Header.Clone()}, nil } func (e *XAIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) { token, baseURL := xaiCreds(auth) if baseURL == "" { baseURL = xaiauth.DefaultAPIBaseURL } prepared, err := e.prepareResponsesRequest(ctx, req, opts, true) if err != nil { return nil, err } reporter := helps.NewExecutorUsageReporter(ctx, e, prepared.baseModel, auth) defer reporter.TrackFailure(ctx, &err) reporter.SetTranslatedReasoningEffort(prepared.body, e.Identifier()) url := strings.TrimSuffix(baseURL, "/") + "/responses" httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(prepared.body)) if err != nil { return nil, err } applyXAIHeaders(httpReq, auth, token, true, prepared.sessionID) e.recordXAIRequest(ctx, auth, url, httpReq.Header.Clone(), prepared.body) httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient = reporter.TrackHTTPClient(httpClient) httpResp, err := httpClient.Do(httpReq) if err != nil { helps.RecordAPIResponseError(ctx, e.cfg, err) return nil, err } helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { data, errRead := io.ReadAll(httpResp.Body) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("xai executor: close response body error: %v", errClose) } if errRead != nil { helps.RecordAPIResponseError(ctx, e.cfg, errRead) return nil, errRead } helps.AppendAPIResponseChunk(ctx, e.cfg, data) helps.LogWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) return nil, statusErr{code: httpResp.StatusCode, msg: string(data)} } out := make(chan cliproxyexecutor.StreamChunk) go func() { defer close(out) defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("xai executor: close response body error: %v", errClose) } }() scanner := bufio.NewScanner(httpResp.Body) scanner.Buffer(nil, 52_428_800) var param any outputItemsByIndex := make(map[int64][]byte) var outputItemsFallback [][]byte for scanner.Scan() { line := scanner.Bytes() helps.AppendAPIResponseChunk(ctx, e.cfg, line) translatedLine := bytes.Clone(line) if bytes.HasPrefix(line, xaiDataTag) { eventData := bytes.TrimSpace(line[len(xaiDataTag):]) switch gjson.GetBytes(eventData, "type").String() { case "response.output_item.done": xaiCollectOutputItemDone(eventData, outputItemsByIndex, &outputItemsFallback) case "response.completed": if detail, ok := helps.ParseCodexUsage(eventData); ok { reporter.Publish(ctx, detail) } eventData = xaiPatchCompletedOutput(eventData, outputItemsByIndex, outputItemsFallback) translatedLine = append([]byte("data: "), eventData...) } } chunks := sdktranslator.TranslateStream(ctx, prepared.to, prepared.from, req.Model, prepared.originalPayload, prepared.body, translatedLine, ¶m) for i := range chunks { select { case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}: case <-ctx.Done(): return } } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx, errScan) select { case out <- cliproxyexecutor.StreamChunk{Err: errScan}: case <-ctx.Done(): } } }() return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil } // CountTokens estimates token count for xAI Responses requests. func (e *XAIExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { prepared, err := e.prepareResponsesRequest(ctx, req, opts, false) if err != nil { return cliproxyexecutor.Response{}, err } enc, err := tokenizer.Get(tokenizer.Cl100kBase) if err != nil { return cliproxyexecutor.Response{}, fmt.Errorf("xai executor: tokenizer init failed: %w", err) } count, err := enc.Count(string(prepared.body)) if err != nil { return cliproxyexecutor.Response{}, fmt.Errorf("xai executor: token counting failed: %w", err) } usageJSON := fmt.Sprintf(`{"response":{"usage":{"input_tokens":%d,"output_tokens":0,"total_tokens":%d}}}`, count, count) translated := sdktranslator.TranslateTokenCount(ctx, prepared.to, prepared.from, int64(count), []byte(usageJSON)) return cliproxyexecutor.Response{Payload: translated}, nil } // Refresh refreshes xAI OAuth credentials using the stored refresh token. func (e *XAIExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) { log.Debugf("xai executor: refresh called") if refreshed, handled, err := helps.RefreshAuthViaHome(ctx, e.cfg, auth); handled { return refreshed, err } if auth == nil { return nil, statusErr{code: http.StatusInternalServerError, msg: "xai executor: auth is nil"} } refreshToken := xaiMetadataString(auth.Metadata, "refresh_token") if refreshToken == "" { return auth, nil } tokenEndpoint := xaiMetadataString(auth.Metadata, "token_endpoint") svc := xaiauth.NewXAIAuthWithProxyURL(e.cfg, auth.ProxyURL) td, err := svc.RefreshTokens(ctx, refreshToken, tokenEndpoint) if err != nil { return nil, err } if auth.Metadata == nil { auth.Metadata = make(map[string]any) } auth.Metadata["type"] = "xai" auth.Metadata["auth_kind"] = "oauth" auth.Metadata["access_token"] = td.AccessToken if td.RefreshToken != "" { auth.Metadata["refresh_token"] = td.RefreshToken } if td.IDToken != "" { auth.Metadata["id_token"] = td.IDToken } if td.TokenType != "" { auth.Metadata["token_type"] = td.TokenType } if td.ExpiresIn > 0 { auth.Metadata["expires_in"] = td.ExpiresIn } if td.Expire != "" { auth.Metadata["expired"] = td.Expire } if td.Email != "" { auth.Metadata["email"] = td.Email } if td.Subject != "" { auth.Metadata["sub"] = td.Subject } if tokenEndpoint != "" { auth.Metadata["token_endpoint"] = tokenEndpoint } if xaiMetadataString(auth.Metadata, "base_url") == "" { auth.Metadata["base_url"] = xaiauth.DefaultAPIBaseURL } auth.Metadata["last_refresh"] = time.Now().UTC().Format(time.RFC3339) if auth.Attributes == nil { auth.Attributes = make(map[string]string) } auth.Attributes["auth_kind"] = "oauth" if strings.TrimSpace(auth.Attributes["base_url"]) == "" { auth.Attributes["base_url"] = xaiauth.DefaultAPIBaseURL } return auth, nil } type xaiPreparedRequest struct { baseModel string from sdktranslator.Format to sdktranslator.Format originalPayload []byte body []byte sessionID string } func (e *XAIExecutor) prepareResponsesRequest(ctx context.Context, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, stream bool) (*xaiPreparedRequest, error) { baseModel := thinking.ParseSuffix(req.Model).ModelName from := opts.SourceFormat to := sdktranslator.FromString("codex") originalPayloadSource := req.Payload if len(opts.OriginalRequest) > 0 { originalPayloadSource = opts.OriginalRequest } originalPayload := bytes.Clone(originalPayloadSource) originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, stream) body := sdktranslator.TranslateRequest(from, to, baseModel, bytes.Clone(req.Payload), stream) var err error body, err = thinking.ApplyThinking(body, req.Model, from.String(), e.Identifier(), e.Identifier()) if err != nil { return nil, err } requestedModel := helps.PayloadRequestedModel(opts, req.Model) requestPath := helps.PayloadRequestPath(opts) body = helps.ApplyPayloadConfigWithRequest(e.cfg, baseModel, to.String(), from.String(), "", body, originalTranslated, requestedModel, requestPath, opts.Headers) body, _ = sjson.SetBytes(body, "model", baseModel) body, _ = sjson.SetBytes(body, "stream", stream) body, _ = sjson.DeleteBytes(body, "previous_response_id") body, _ = sjson.DeleteBytes(body, "prompt_cache_retention") body, _ = sjson.DeleteBytes(body, "safety_identifier") body, _ = sjson.DeleteBytes(body, "stream_options") body = normalizeXAITools(body) body = normalizeXAIToolChoiceForTools(body) body = normalizeXAIInputReasoningItems(body) body = normalizeCodexInstructions(body) body = sanitizeXAIResponsesBody(body, baseModel) sessionID := xaiExecutionSessionID(req, opts) if sessionID != "" { body, _ = sjson.SetBytes(body, "prompt_cache_key", sessionID) } return &xaiPreparedRequest{ baseModel: baseModel, from: from, to: to, originalPayload: originalPayload, body: body, sessionID: sessionID, }, nil } func (e *XAIExecutor) recordXAIRequest(ctx context.Context, auth *cliproxyauth.Auth, url string, headers http.Header, body []byte) { var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } helps.RecordAPIRequest(ctx, e.cfg, helps.UpstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: headers, Body: body, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) } func xaiCreds(auth *cliproxyauth.Auth) (token, baseURL string) { if auth == nil { return "", "" } if auth.Attributes != nil { token = strings.TrimSpace(auth.Attributes["api_key"]) baseURL = strings.TrimSpace(auth.Attributes["base_url"]) } if auth.Metadata != nil { if token == "" { token = xaiMetadataString(auth.Metadata, "access_token") } if baseURL == "" { baseURL = xaiMetadataString(auth.Metadata, "base_url") } } return token, baseURL } func applyXAIHeaders(r *http.Request, auth *cliproxyauth.Auth, token string, stream bool, sessionID string) { r.Header.Set("Content-Type", "application/json") if strings.TrimSpace(token) != "" { r.Header.Set("Authorization", "Bearer "+token) } if stream { r.Header.Set("Accept", "text/event-stream") } else { r.Header.Set("Accept", "application/json") } r.Header.Set("Connection", "Keep-Alive") if sessionID != "" { r.Header.Set("x-grok-conv-id", sessionID) } var attrs map[string]string if auth != nil { attrs = auth.Attributes } util.ApplyCustomHeadersFromAttrs(r, attrs) } func xaiExecutionSessionID(req cliproxyexecutor.Request, opts cliproxyexecutor.Options) string { if value := xaiMetadataString(opts.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); value != "" { return value } if value := xaiMetadataString(req.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); value != "" { return value } if promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key"); promptCacheKey.Exists() { return strings.TrimSpace(promptCacheKey.String()) } return "" } func xaiImageEndpointPath(opts cliproxyexecutor.Options) string { if opts.SourceFormat.String() != xaiImageHandlerType { return "" } path := xaiMetadataString(opts.Metadata, cliproxyexecutor.RequestPathMetadataKey) if strings.HasSuffix(path, "/images/edits") { return xaiImagesEditsPath } if strings.HasSuffix(path, "/images/generations") { return xaiImagesGenerationsPath } return xaiDefaultImageEndpointPath } func xaiIsVideoRequest(opts cliproxyexecutor.Options) bool { return opts.SourceFormat.String() == xaiVideoHandlerType } func xaiVideoEndpointPath(opts cliproxyexecutor.Options) string { if !xaiIsVideoRequest(opts) { return "" } path := xaiMetadataString(opts.Metadata, cliproxyexecutor.RequestPathMetadataKey) if strings.HasSuffix(path, "/videos/edits") { return xaiVideosEditsPath } if strings.HasSuffix(path, "/videos/extensions") { return xaiVideosExtensionsPath } if strings.HasSuffix(path, "/videos/generations") { return xaiVideosGenerationsPath } return "" } func xaiMetadataString(meta map[string]any, key string) string { if len(meta) == 0 || key == "" { return "" } value, ok := meta[key] if !ok || value == nil { return "" } switch typed := value.(type) { case string: return strings.TrimSpace(typed) case fmt.Stringer: return strings.TrimSpace(typed.String()) default: return strings.TrimSpace(fmt.Sprint(typed)) } } func sanitizeXAIResponsesBody(body []byte, model string) []byte { body = removeXAIEncryptedReasoningInclude(body) if !xaiSupportsReasoningEffort(model) { body, _ = sjson.DeleteBytes(body, "reasoning") } return body } func normalizeXAITools(body []byte) []byte { tools := gjson.GetBytes(body, "tools") if !tools.Exists() || !tools.IsArray() { return body } changed := false filtered := []byte(`[]`) for _, tool := range tools.Array() { toolType := tool.Get("type").String() if toolType == xaiNamespaceToolType { changed = true if namespaceTools := tool.Get("tools"); namespaceTools.IsArray() { for _, nestedTool := range namespaceTools.Array() { nestedRaw, nestedChanged, ok := normalizeXAITool(nestedTool) if !ok { return body } changed = changed || nestedChanged if len(nestedRaw) == 0 { continue } updated, errSet := sjson.SetRawBytes(filtered, "-1", nestedRaw) if errSet != nil { return body } filtered = updated } } continue } raw, toolChanged, ok := normalizeXAITool(tool) if !ok { return body } changed = changed || toolChanged if len(raw) == 0 { continue } updated, errSet := sjson.SetRawBytes(filtered, "-1", raw) if errSet != nil { return body } filtered = updated } if !changed { return body } updated, errSet := sjson.SetRawBytes(body, "tools", filtered) if errSet != nil { return body } return updated } // normalizeXAIToolChoiceForTools drops tool_choice and parallel_tool_calls // when tools are absent or empty (including after normalizeXAITools filtering). // xAI rejects payloads that include tool_choice without any tools defined. // Existence checks avoid unnecessary sjson parse/copy passes. func normalizeXAIToolChoiceForTools(body []byte) []byte { tools := gjson.GetBytes(body, "tools") hasTools := tools.Exists() && tools.IsArray() && len(tools.Array()) > 0 if hasTools { return body } if tools.Exists() { body, _ = sjson.DeleteBytes(body, "tools") } if gjson.GetBytes(body, "tool_choice").Exists() { body, _ = sjson.DeleteBytes(body, "tool_choice") } if gjson.GetBytes(body, "parallel_tool_calls").Exists() { body, _ = sjson.DeleteBytes(body, "parallel_tool_calls") } return body } func normalizeXAITool(tool gjson.Result) ([]byte, bool, bool) { toolType := tool.Get("type").String() changed := false if toolType == xaiToolSearchType || toolType == xaiImageGenerationToolType { return nil, true, true } raw := []byte(tool.Raw) if toolType == xaiCustomToolType { if tool.Get("name").String() == "apply_patch" { return nil, true, true } updatedTool, errSet := sjson.SetBytes(raw, "type", xaiFunctionToolType) if errSet != nil { return nil, false, false } raw = updatedTool toolType = xaiFunctionToolType changed = true } if toolType == xaiWebSearchToolType && tool.Get("external_web_access").Exists() { updatedTool, errDel := sjson.DeleteBytes(raw, "external_web_access") if errDel != nil { return nil, false, false } raw = updatedTool changed = true } if toolType == xaiFunctionToolType && !tool.Get("parameters").Exists() { updatedTool, errSet := sjson.SetRawBytes(raw, "parameters", []byte(`{"type":"object","properties":{}}`)) if errSet != nil { return nil, false, false } raw = updatedTool changed = true } return raw, changed, true } func normalizeXAIInputReasoningItems(body []byte) []byte { input := gjson.GetBytes(body, "input") if !input.Exists() || !input.IsArray() { return body } updated := body for i, item := range input.Array() { if item.Get("type").String() != "reasoning" { continue } contentPath := fmt.Sprintf("input.%d.content", i) if content := gjson.GetBytes(updated, contentPath); content.Exists() && content.Type == gjson.Null { updatedBody, errDel := sjson.DeleteBytes(updated, contentPath) if errDel != nil { return body } updated = updatedBody } encryptedContentPath := fmt.Sprintf("input.%d.encrypted_content", i) if encryptedContent := gjson.GetBytes(updated, encryptedContentPath); encryptedContent.Exists() && encryptedContent.Type == gjson.Null { updatedBody, errDel := sjson.DeleteBytes(updated, encryptedContentPath) if errDel != nil { return body } updated = updatedBody } } return mergeAdjacentXAIInputReasoningSummaries(updated) } func mergeAdjacentXAIInputReasoningSummaries(body []byte) []byte { input := gjson.GetBytes(body, "input") if !input.Exists() || !input.IsArray() { return body } changed := false items := make([]json.RawMessage, 0, len(input.Array())) for _, item := range input.Array() { if len(items) > 0 && canMergeXAIReasoningSummary(items[len(items)-1], item) { merged, ok := appendXAIReasoningSummary(items[len(items)-1], item.Get("summary").Array()) if ok { items[len(items)-1] = json.RawMessage(merged) changed = true continue } } items = append(items, json.RawMessage(item.Raw)) } if !changed { return body } rawInput, errMarshal := json.Marshal(items) if errMarshal != nil { return body } updated, errSet := sjson.SetRawBytes(body, "input", rawInput) if errSet != nil { return body } return updated } func canMergeXAIReasoningSummary(previous json.RawMessage, current gjson.Result) bool { previousItem := gjson.ParseBytes(previous) if previousItem.Get("type").String() != "reasoning" || current.Get("type").String() != "reasoning" { return false } if !previousItem.Get("summary").IsArray() || !current.Get("summary").IsArray() { return false } if len(current.Get("summary").Array()) == 0 { return false } for name := range current.Map() { if name != "type" && name != "summary" { return false } } return true } func appendXAIReasoningSummary(previous json.RawMessage, currentSummary []gjson.Result) ([]byte, bool) { updated := []byte(previous) summary := gjson.GetBytes(updated, "summary") if !summary.IsArray() { return previous, false } nextIndex := len(summary.Array()) for i, item := range currentSummary { updatedItem, errSet := sjson.SetRawBytes(updated, fmt.Sprintf("summary.%d", nextIndex+i), []byte(item.Raw)) if errSet != nil { return previous, false } updated = updatedItem } return updated, true } func removeXAIEncryptedReasoningInclude(body []byte) []byte { include := gjson.GetBytes(body, "include") if !include.Exists() || !include.IsArray() { return body } kept := make([]string, 0, len(include.Array())) for _, item := range include.Array() { value := strings.TrimSpace(item.String()) if value == "" || value == "reasoning.encrypted_content" { continue } kept = append(kept, value) } body, _ = sjson.SetBytes(body, "include", kept) return body } func xaiSupportsReasoningEffort(model string) bool { name := strings.ToLower(strings.TrimSpace(thinking.ParseSuffix(model).ModelName)) if idx := strings.LastIndex(name, "/"); idx >= 0 { name = name[idx+1:] } switch { case strings.HasPrefix(name, "grok-3-mini"): return true case strings.HasPrefix(name, "grok-4.20-multi-agent"): return true case strings.HasPrefix(name, "grok-4.3"): return true default: return false } } func xaiCollectOutputItemDone(eventData []byte, outputItemsByIndex map[int64][]byte, outputItemsFallback *[][]byte) { itemResult := gjson.GetBytes(eventData, "item") if !itemResult.Exists() || itemResult.Type != gjson.JSON { return } outputIndexResult := gjson.GetBytes(eventData, "output_index") if outputIndexResult.Exists() { outputItemsByIndex[outputIndexResult.Int()] = []byte(itemResult.Raw) return } *outputItemsFallback = append(*outputItemsFallback, []byte(itemResult.Raw)) } func xaiPatchCompletedOutput(eventData []byte, outputItemsByIndex map[int64][]byte, outputItemsFallback [][]byte) []byte { outputResult := gjson.GetBytes(eventData, "response.output") shouldPatchOutput := (!outputResult.Exists() || !outputResult.IsArray() || len(outputResult.Array()) == 0) && (len(outputItemsByIndex) > 0 || len(outputItemsFallback) > 0) if !shouldPatchOutput { return eventData } indexes := make([]int64, 0, len(outputItemsByIndex)) for idx := range outputItemsByIndex { indexes = append(indexes, idx) } sort.Slice(indexes, func(i, j int) bool { return indexes[i] < indexes[j] }) outputArray := []byte("[]") var buf bytes.Buffer buf.WriteByte('[') wrote := false for _, idx := range indexes { if wrote { buf.WriteByte(',') } buf.Write(outputItemsByIndex[idx]) wrote = true } for _, item := range outputItemsFallback { if wrote { buf.WriteByte(',') } buf.Write(item) wrote = true } buf.WriteByte(']') if wrote { outputArray = buf.Bytes() } patched, _ := sjson.SetRawBytes(eventData, "response.output", outputArray) return patched }