mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-06-24 03:28:23 +08:00
* feat(plugin): add ModelRouter before auth with single-slot routing targets ## Motivation Plugins that need to change execution based on the **original inbound request** (protocol format, raw body, headers, query, stream flag, metadata, etc.) often resorted to virtual/trampoline models or routing inside interceptors. This commit adds **ModelRouter**: a pluggable layer **before** model-to-provider resolution and AuthManager credential selection, so plugins can declare who executes a request without spoofing the client model name. This is a **new capability**, not a bugfix on the existing chain. With no ModelRouter plugins loaded, behavior matches upstream. ## Pipeline placement - `execute`, `stream`, and `count` (and image paths via AuthManager) call `applyModelRouter()` before building `coreexecutor.Request`. - Routing runs **before** the request interceptor (before auth), so routers see the client’s original context. After a plugin executor is chosen, the existing **after-auth interceptor → response/stream interceptor** chain still applies. - Internal `ExecuteModel` / `ExecuteModelStream` (host callbacks) support `SkipRouterPluginID` so nested calls do not re-enter the same router. ## Routing API (single slot, mutually exclusive) `ModelRouteResponse` uses **one target slot** to avoid ambiguity when both `TargetExecutorPluginID` and `TargetProvider` were set and the host ignored one: | Field | Meaning | |-------|---------| | `Handled` | `false`: this router declines; try the next router or default path | | `TargetKind` | `self` \| `executor` \| `provider` (pick one) | | `Target` | `self`/`executor`: plugin ID; `provider`: built-in provider key | | `TargetModel` | Optional on `provider` only; empty keeps client `RequestedModel` | | `Reason` | Optional diagnostic text | - **self**: the router plugin’s own executor (`Target` normalized to the router’s plugin ID). - **executor**: another plugin’s executor; host pre-checks with `executorPluginReady()` (executor declared and provider identifier resolvable) to avoid handled routes that 500 at execution. - **provider**: skip registry model resolution; fixed built-in AuthManager path; optional `TargetModel` for execution model only—**does not** change outward requested-model metadata. Routers run in **descending plugin priority** (tie-break: ascending plugin ID). Panic, error, invalid target, or unavailable executor/provider → log and **fall through to the next router**; if none handle, use the original provider+auth flow. ## Context exposed to routers `ModelRouteRequest` includes: - `SourceFormat`, `RequestedModel`, `Stream` - `Headers`, `Query`, `Body` (defensive copies) - `Metadata` (best-effort read-only context snapshot) - `AvailableProviders`: built-in provider keys with at least one **non-disabled** auth (`AuthManager.AvailableProviders()`). **Does not** reflect per-model cooldown or transient unavailability—treat as an optimistic snapshot. Adds `AuthManager.HasProviderAuth()` and `AvailableProviders()`, excluding `Disabled` and `StatusDisabled` auths consistently with credential selection. ## Host and RPC - Go plugins: `pluginapi.ModelRouter` + `RouteModel()`. - RPC plugins: `pluginabi.MethodModelRoute` (`model.route`), capability flag `model_router`. - `pluginhost.Host` implements `RouteModel` / `RouteModelExcept`; handlers use `SetModelRouterHost` or a `PluginHost` type assertion; **direct executor** paths use `ExecutePluginExecutor*` / `CountPluginExecutor`. - No bundled example ModelRouter plugin; capability is active only when a third-party plugin declares `model_router` and loads. ## Plugin RPC schema (policy A, upstream-aligned) - `pluginabi.SchemaVersion` stays **1**: capability additions (`model_router`, `model.route`) do not bump the number; increment only on breaking RPC JSON changes. - Host sends `schema_version` at register; reject only if the plugin declares a **higher** version than the host. - No unpublished “ModelRouter requires schema ≥ 3” gate (v3 single-slot API was never public). - Existing plugins and examples without `model_router` (`schema_version: 1`) need no changes. - RPC ModelRouter: `schema_version: 1` + `model_router: true` + implement `model.route`. ## Path consistency within this commit - Provider routes reuse image-only model checks (e.g. `gpt-image-2`) on the normalized model, same as the default AuthManager path. - `count` aligned with execute/stream: `SkipRouterPluginID`, query/headers injection, interceptor skip semantics. - Handlers: `modelRoutersEnabled` treats hosts without `HasModelRouters` as disabled (same as before ModelRouter existed); `pluginhost.Host` implements the detector. - API docs: `ModelRouter` explicitly includes built-in **provider** targets (in addition to plugin executors and the router’s own executor). ## Testing go test ./internal/pluginhost ./sdk/api/handlers ./sdk/pluginapi ./sdk/pluginabi ./sdk/cliproxy/auth go build -o test-output ./cmd/server && rm test-output go test ./... * fix(handlers): address ModelRouter review feedback - Use modelExecutionQuery for plugin executor and AuthManager paths so inbound URL query matches router/header behavior - Guard queryFromContext when gin Request.URL is nil - Read plugin executor stream chunks via nextStreamChunk to exit on cancel - Drop redundant clonePluginMetadata on capability record meta Tests cover query propagation, stream cancel, and nil URL safety. * feat(plugin): add Claude web search router example Add a Claude Code web_search ModelRouter example that can route matching Claude requests through Antigravity, Codex, xAI, or Tavily. The plugin includes executor orchestration, backend fallback/penalty handling, Tavily API key support, Claude-compatible response assembly, stream forwarding, and focused unit coverage for detection, fallback routing, model resolution, penalties, stream forwarding, and Tavily behavior. Verification: go test -count=1 ./... in examples/plugin/claude-web-search-router/go; go build -buildmode=c-shared for the plugin; go build ./cmd/server; live local CPA curl coverage for plugin load, four explicit routes, fallback, and Codex spark routing. * fix(pluginhost): validate executor routes before fallback * fix(pluginhost): skip oauth-only executor routes
335 lines
11 KiB
Go
335 lines
11 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
|
|
"github.com/router-for-me/CLIProxyAPI/v7/sdk/pluginabi"
|
|
"github.com/router-for-me/CLIProxyAPI/v7/sdk/pluginapi"
|
|
)
|
|
|
|
type executionPlan struct {
|
|
backend routeBackend
|
|
model string
|
|
}
|
|
|
|
func buildExecutionPlans(cfg pluginConfig, req pluginapi.ModelRouteRequest) []executionPlan {
|
|
return buildExecutionPlansInternal(cfg, req, true)
|
|
}
|
|
|
|
func buildExecutionPlansForExecute(cfg pluginConfig, req pluginapi.ModelRouteRequest) []executionPlan {
|
|
route := strings.TrimSpace(cfg.Route)
|
|
if isFallbackRoute(route) {
|
|
return buildExecutionPlansInternal(cfg, req, false)
|
|
}
|
|
return executionPlansForExecuteRoute(cfg, req, route)
|
|
}
|
|
|
|
// executionPlansForExecuteRoute builds plans for plugin executor without requiring
|
|
// ModelRouteRequest.AvailableProviders (host does not pass it on executor.execute_stream).
|
|
func executionPlansForExecuteRoute(cfg pluginConfig, req pluginapi.ModelRouteRequest, route string) []executionPlan {
|
|
backend := routeBackend(strings.TrimSpace(route))
|
|
if !backendRunnableLenient(backend, cfg, req) {
|
|
return nil
|
|
}
|
|
var plans []executionPlan
|
|
switch backend {
|
|
case backendAntigravityGoogle:
|
|
model := resolveAntigravityWebSearchTargetModel(cfg.AntigravityModel, req.RequestedModel)
|
|
if model == "" {
|
|
return nil
|
|
}
|
|
plans = append(plans, executionPlan{backend: backend, model: model})
|
|
case backendCodexWebSearch:
|
|
plans = append(plans, executionPlan{backend: backend, model: resolveCodexWebSearchTargetModel(cfg.CodexModel)})
|
|
case backendXAIWebSearch:
|
|
plans = append(plans, executionPlan{backend: backend, model: resolveXAIWebSearchTargetModel(cfg.XAIModel)})
|
|
case backendTavily:
|
|
if !newTavilyClient(cfg.TavilyAPIKeys).available() {
|
|
return nil
|
|
}
|
|
plans = append(plans, executionPlan{backend: backend})
|
|
default:
|
|
return nil
|
|
}
|
|
return plans
|
|
}
|
|
|
|
func buildExecutionPlansInternal(cfg pluginConfig, req pluginapi.ModelRouteRequest, requireProviders bool) []executionPlan {
|
|
var plans []executionPlan
|
|
for _, backend := range defaultWebSearchFallbackChain() {
|
|
if requireProviders {
|
|
if _, ok := tryRouteBackend(backend, cfg, req); !ok {
|
|
continue
|
|
}
|
|
} else if !backendRunnableLenient(backend, cfg, req) {
|
|
continue
|
|
}
|
|
switch backend {
|
|
case backendAntigravityGoogle:
|
|
plans = append(plans, executionPlan{
|
|
backend: backend,
|
|
model: resolveAntigravityWebSearchTargetModel(cfg.AntigravityModel, req.RequestedModel),
|
|
})
|
|
case backendCodexWebSearch:
|
|
plans = append(plans, executionPlan{
|
|
backend: backend,
|
|
model: resolveCodexWebSearchTargetModel(cfg.CodexModel),
|
|
})
|
|
case backendXAIWebSearch:
|
|
plans = append(plans, executionPlan{
|
|
backend: backend,
|
|
model: resolveXAIWebSearchTargetModel(cfg.XAIModel),
|
|
})
|
|
case backendTavily:
|
|
plans = append(plans, executionPlan{backend: backend})
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
return plans
|
|
}
|
|
|
|
func backendRunnableLenient(backend routeBackend, cfg pluginConfig, req pluginapi.ModelRouteRequest) bool {
|
|
switch backend {
|
|
case backendTavily:
|
|
return newTavilyClient(cfg.TavilyAPIKeys).available()
|
|
case backendAntigravityGoogle:
|
|
return resolveAntigravityWebSearchTargetModel(cfg.AntigravityModel, req.RequestedModel) != ""
|
|
case backendCodexWebSearch, backendXAIWebSearch:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func executionPlansForRoute(cfg pluginConfig, req pluginapi.ModelRouteRequest, route string) []executionPlan {
|
|
if isFallbackRoute(route) {
|
|
return buildExecutionPlans(cfg, req)
|
|
}
|
|
backend := routeBackend(strings.TrimSpace(route))
|
|
if _, ok := tryRouteBackend(backend, cfg, req); !ok {
|
|
return nil
|
|
}
|
|
var plans []executionPlan
|
|
for _, b := range []routeBackend{backend} {
|
|
if !backendRunnableLenient(b, cfg, req) {
|
|
continue
|
|
}
|
|
switch b {
|
|
case backendAntigravityGoogle:
|
|
plans = append(plans, executionPlan{backend: b, model: resolveAntigravityWebSearchTargetModel(cfg.AntigravityModel, req.RequestedModel)})
|
|
case backendCodexWebSearch:
|
|
plans = append(plans, executionPlan{backend: b, model: resolveCodexWebSearchTargetModel(cfg.CodexModel)})
|
|
case backendXAIWebSearch:
|
|
plans = append(plans, executionPlan{backend: b, model: resolveXAIWebSearchTargetModel(cfg.XAIModel)})
|
|
case backendTavily:
|
|
plans = append(plans, executionPlan{backend: b})
|
|
}
|
|
}
|
|
return plans
|
|
}
|
|
|
|
func claudeRequestBody(exec pluginapi.ExecutorRequest) []byte {
|
|
if len(exec.OriginalRequest) > 0 {
|
|
return exec.OriginalRequest
|
|
}
|
|
return exec.Payload
|
|
}
|
|
|
|
func runWebSearchWithExecutionFallback(ctx context.Context, exec pluginapi.ExecutorRequest, hostCallbackID string) ([]byte, http.Header, error) {
|
|
cfg := loadedConfig()
|
|
req := pluginapi.ModelRouteRequest{
|
|
SourceFormat: "claude",
|
|
RequestedModel: strings.TrimSpace(exec.Model),
|
|
Body: claudeRequestBody(exec),
|
|
AvailableProviders: availableProvidersFromMetadata(exec.Metadata),
|
|
}
|
|
return runOrderedExecutionPlans(ctx, exec, hostCallbackID, cfg, buildExecutionPlansForExecute(cfg, req), false)
|
|
}
|
|
|
|
// runWebSearchStreamWithExecutionFallback buffers the full host stream (non-streaming RPC path only).
|
|
func runWebSearchStreamWithExecutionFallback(ctx context.Context, exec pluginapi.ExecutorRequest, hostCallbackID string) ([]byte, http.Header, error) {
|
|
cfg := loadedConfig()
|
|
req := pluginapi.ModelRouteRequest{
|
|
SourceFormat: "claude",
|
|
RequestedModel: strings.TrimSpace(exec.Model),
|
|
Body: claudeRequestBody(exec),
|
|
AvailableProviders: availableProvidersFromMetadata(exec.Metadata),
|
|
}
|
|
return runOrderedExecutionPlans(ctx, exec, hostCallbackID, cfg, buildExecutionPlansForExecute(cfg, req), true)
|
|
}
|
|
|
|
func runOrderedExecutionPlans(ctx context.Context, exec pluginapi.ExecutorRequest, hostCallbackID string, cfg pluginConfig, plans []executionPlan, stream bool) ([]byte, http.Header, error) {
|
|
if len(plans) == 0 {
|
|
return nil, nil, fmt.Errorf("web search execution: no backend available")
|
|
}
|
|
backends := make([]routeBackend, 0, len(plans))
|
|
for _, p := range plans {
|
|
backends = append(backends, p.backend)
|
|
}
|
|
ordered := sortBackendsByPenalty(backends)
|
|
planByBackend := make(map[routeBackend]executionPlan, len(plans))
|
|
for _, p := range plans {
|
|
planByBackend[p.backend] = p
|
|
}
|
|
|
|
body := claudeRequestBody(exec)
|
|
var lastErr error
|
|
for _, backend := range ordered {
|
|
plan := planByBackend[backend]
|
|
switch backend {
|
|
case backendTavily:
|
|
var payload []byte
|
|
var headers http.Header
|
|
var errRun error
|
|
if stream {
|
|
payload, headers, errRun = runTavilyClaudeStreamWithClient(ctx, exec, newTavilyClient(cfg.TavilyAPIKeys))
|
|
} else {
|
|
payload, headers, errRun = runTavilyClaudeWithClient(ctx, exec, newTavilyClient(cfg.TavilyAPIKeys))
|
|
}
|
|
if errRun != nil {
|
|
lastErr = errRun
|
|
continue
|
|
}
|
|
recordBackendSuccess(backend)
|
|
return payload, headers, nil
|
|
default:
|
|
payload, status, errRun := hostModelExecuteClaude(ctx, hostCallbackID, plan.model, body, stream)
|
|
if errRun != nil {
|
|
lastErr = errRun
|
|
if isRetryableHTTPStatus(hostHTTPStatusFromError(errRun)) {
|
|
recordBackendFailure(backend)
|
|
}
|
|
continue
|
|
}
|
|
if isRetryableHTTPStatus(status) {
|
|
recordBackendFailure(backend)
|
|
lastErr = fmt.Errorf("host model status %d", status)
|
|
continue
|
|
}
|
|
recordBackendSuccess(backend)
|
|
headers := http.Header{"Content-Type": []string{"application/json"}}
|
|
if stream {
|
|
headers = http.Header{"Content-Type": []string{"text/event-stream"}}
|
|
}
|
|
return payload, headers, nil
|
|
}
|
|
}
|
|
if lastErr != nil {
|
|
return nil, nil, lastErr
|
|
}
|
|
return nil, nil, fmt.Errorf("web search execution: all backends failed")
|
|
}
|
|
|
|
func availableProvidersFromMetadata(meta map[string]any) []string {
|
|
if meta == nil {
|
|
return nil
|
|
}
|
|
raw, ok := meta["available_providers"]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
switch v := raw.(type) {
|
|
case []string:
|
|
return v
|
|
case []any:
|
|
out := make([]string, 0, len(v))
|
|
for _, item := range v {
|
|
if s, okItem := item.(string); okItem {
|
|
out = append(out, s)
|
|
}
|
|
}
|
|
return out
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func hostModelExecuteClaude(ctx context.Context, hostCallbackID, execModel string, body []byte, stream bool) ([]byte, int, error) {
|
|
if stream {
|
|
return hostModelStreamClaude(ctx, hostCallbackID, execModel, body)
|
|
}
|
|
raw, errCall := callHost(pluginabi.MethodHostModelExecute, hostModelExecutionRequest{
|
|
HostModelExecutionRequest: pluginapi.HostModelExecutionRequest{
|
|
EntryProtocol: "claude",
|
|
ExitProtocol: "claude",
|
|
Model: execModel,
|
|
Stream: false,
|
|
Body: body,
|
|
},
|
|
HostCallbackID: hostCallbackID,
|
|
})
|
|
if errCall != nil {
|
|
return nil, hostHTTPStatusFromError(errCall), errCall
|
|
}
|
|
var resp pluginapi.HostModelExecutionResponse
|
|
if errDecode := json.Unmarshal(raw, &resp); errDecode != nil {
|
|
return nil, 0, errDecode
|
|
}
|
|
if resp.StatusCode >= 400 {
|
|
return nil, resp.StatusCode, fmt.Errorf("host model status %d", resp.StatusCode)
|
|
}
|
|
return resp.Body, resp.StatusCode, nil
|
|
}
|
|
|
|
func hostModelStreamClaude(ctx context.Context, hostCallbackID, execModel string, body []byte) ([]byte, int, error) {
|
|
raw, errCall := callHost(pluginabi.MethodHostModelExecuteStream, hostModelExecutionRequest{
|
|
HostModelExecutionRequest: pluginapi.HostModelExecutionRequest{
|
|
EntryProtocol: "claude",
|
|
ExitProtocol: "claude",
|
|
Model: execModel,
|
|
Stream: true,
|
|
Body: body,
|
|
},
|
|
HostCallbackID: hostCallbackID,
|
|
})
|
|
if errCall != nil {
|
|
return nil, hostHTTPStatusFromError(errCall), errCall
|
|
}
|
|
var resp pluginapi.HostModelStreamResponse
|
|
if errDecode := json.Unmarshal(raw, &resp); errDecode != nil {
|
|
return nil, 0, errDecode
|
|
}
|
|
if resp.StatusCode >= 400 {
|
|
_ = closeHostModelStream(resp.StreamID)
|
|
return nil, resp.StatusCode, fmt.Errorf("host model status %d", resp.StatusCode)
|
|
}
|
|
if strings.TrimSpace(resp.StreamID) == "" {
|
|
return nil, 0, fmt.Errorf("host model stream: empty stream_id")
|
|
}
|
|
defer func() { _ = closeHostModelStream(resp.StreamID) }()
|
|
|
|
var buf bytes.Buffer
|
|
for {
|
|
chunkRaw, errRead := callHost(pluginabi.MethodHostModelStreamRead, pluginapi.HostModelStreamReadRequest{StreamID: resp.StreamID})
|
|
if errRead != nil {
|
|
return nil, hostHTTPStatusFromError(errRead), errRead
|
|
}
|
|
var chunk pluginapi.HostModelStreamReadResponse
|
|
if errDecode := json.Unmarshal(chunkRaw, &chunk); errDecode != nil {
|
|
return nil, 0, errDecode
|
|
}
|
|
if chunk.Error != "" {
|
|
code := hostHTTPStatusFromError(fmt.Errorf("%s", chunk.Error))
|
|
return nil, code, fmt.Errorf("%s", chunk.Error)
|
|
}
|
|
if len(chunk.Payload) > 0 {
|
|
buf.Write(chunk.Payload)
|
|
}
|
|
if chunk.Done {
|
|
break
|
|
}
|
|
}
|
|
return buf.Bytes(), http.StatusOK, nil
|
|
}
|
|
|
|
func closeHostModelStream(streamID string) error {
|
|
_, errCall := callHost(pluginabi.MethodHostModelStreamClose, pluginapi.HostModelStreamCloseRequest{StreamID: streamID})
|
|
return errCall
|
|
}
|