Files
CLIProxyAPI/sdk/api/handlers/model_execution.go
sususu98 87132e54d7 feat(plugin): add ModelRouter before auth with single-slot routing targets (#3865)
* feat(plugin): add ModelRouter before auth with single-slot routing targets

## Motivation

Plugins that need to change execution based on the **original inbound request**
(protocol format, raw body, headers, query, stream flag, metadata, etc.) often
resorted to virtual/trampoline models or routing inside interceptors. This
commit adds **ModelRouter**: a pluggable layer **before** model-to-provider
resolution and AuthManager credential selection, so plugins can declare who
executes a request without spoofing the client model name.

This is a **new capability**, not a bugfix on the existing chain. With no
ModelRouter plugins loaded, behavior matches upstream.

## Pipeline placement

- `execute`, `stream`, and `count` (and image paths via AuthManager) call
  `applyModelRouter()` before building `coreexecutor.Request`.
- Routing runs **before** the request interceptor (before auth), so routers see
  the client’s original context. After a plugin executor is chosen, the existing
  **after-auth interceptor → response/stream interceptor** chain still applies.
- Internal `ExecuteModel` / `ExecuteModelStream` (host callbacks) support
  `SkipRouterPluginID` so nested calls do not re-enter the same router.

## Routing API (single slot, mutually exclusive)

`ModelRouteResponse` uses **one target slot** to avoid ambiguity when both
`TargetExecutorPluginID` and `TargetProvider` were set and the host ignored one:

| Field | Meaning |
|-------|---------|
| `Handled` | `false`: this router declines; try the next router or default path |
| `TargetKind` | `self` \| `executor` \| `provider` (pick one) |
| `Target` | `self`/`executor`: plugin ID; `provider`: built-in provider key |
| `TargetModel` | Optional on `provider` only; empty keeps client `RequestedModel` |
| `Reason` | Optional diagnostic text |

- **self**: the router plugin’s own executor (`Target` normalized to the router’s plugin ID).
- **executor**: another plugin’s executor; host pre-checks with `executorPluginReady()`
  (executor declared and provider identifier resolvable) to avoid handled routes that 500 at execution.
- **provider**: skip registry model resolution; fixed built-in AuthManager path; optional
  `TargetModel` for execution model only—**does not** change outward requested-model metadata.

Routers run in **descending plugin priority** (tie-break: ascending plugin ID). Panic, error,
invalid target, or unavailable executor/provider → log and **fall through to the next router**;
if none handle, use the original provider+auth flow.

## Context exposed to routers

`ModelRouteRequest` includes:

- `SourceFormat`, `RequestedModel`, `Stream`
- `Headers`, `Query`, `Body` (defensive copies)
- `Metadata` (best-effort read-only context snapshot)
- `AvailableProviders`: built-in provider keys with at least one **non-disabled** auth
  (`AuthManager.AvailableProviders()`). **Does not** reflect per-model cooldown or transient
  unavailability—treat as an optimistic snapshot.

Adds `AuthManager.HasProviderAuth()` and `AvailableProviders()`, excluding `Disabled` and
`StatusDisabled` auths consistently with credential selection.

## Host and RPC

- Go plugins: `pluginapi.ModelRouter` + `RouteModel()`.
- RPC plugins: `pluginabi.MethodModelRoute` (`model.route`), capability flag `model_router`.
- `pluginhost.Host` implements `RouteModel` / `RouteModelExcept`; handlers use
  `SetModelRouterHost` or a `PluginHost` type assertion; **direct executor** paths use
  `ExecutePluginExecutor*` / `CountPluginExecutor`.
- No bundled example ModelRouter plugin; capability is active only when a third-party plugin
  declares `model_router` and loads.

## Plugin RPC schema (policy A, upstream-aligned)

- `pluginabi.SchemaVersion` stays **1**: capability additions (`model_router`, `model.route`)
  do not bump the number; increment only on breaking RPC JSON changes.
- Host sends `schema_version` at register; reject only if the plugin declares a **higher**
  version than the host.
- No unpublished “ModelRouter requires schema ≥ 3” gate (v3 single-slot API was never public).
- Existing plugins and examples without `model_router` (`schema_version: 1`) need no changes.
- RPC ModelRouter: `schema_version: 1` + `model_router: true` + implement `model.route`.

## Path consistency within this commit

- Provider routes reuse image-only model checks (e.g. `gpt-image-2`) on the normalized model,
  same as the default AuthManager path.
- `count` aligned with execute/stream: `SkipRouterPluginID`, query/headers injection,
  interceptor skip semantics.
- Handlers: `modelRoutersEnabled` treats hosts without `HasModelRouters` as disabled
  (same as before ModelRouter existed); `pluginhost.Host` implements the detector.
- API docs: `ModelRouter` explicitly includes built-in **provider** targets (in addition to
  plugin executors and the router’s own executor).

## Testing

go test ./internal/pluginhost ./sdk/api/handlers ./sdk/pluginapi ./sdk/pluginabi ./sdk/cliproxy/auth
go build -o test-output ./cmd/server && rm test-output
go test ./...

* fix(handlers): address ModelRouter review feedback

- Use modelExecutionQuery for plugin executor and AuthManager paths so
  inbound URL query matches router/header behavior
- Guard queryFromContext when gin Request.URL is nil
- Read plugin executor stream chunks via nextStreamChunk to exit on cancel
- Drop redundant clonePluginMetadata on capability record meta

Tests cover query propagation, stream cancel, and nil URL safety.

* feat(plugin): add Claude web search router example

Add a Claude Code web_search ModelRouter example that can route matching Claude requests through Antigravity, Codex, xAI, or Tavily.

The plugin includes executor orchestration, backend fallback/penalty handling, Tavily API key support, Claude-compatible response assembly, stream forwarding, and focused unit coverage for detection, fallback routing, model resolution, penalties, stream forwarding, and Tavily behavior.

Verification: go test -count=1 ./... in examples/plugin/claude-web-search-router/go; go build -buildmode=c-shared for the plugin; go build ./cmd/server; live local CPA curl coverage for plugin load, four explicit routes, fallback, and Codex spark routing.

* fix(pluginhost): validate executor routes before fallback

* fix(pluginhost): skip oauth-only executor routes
2026-06-16 19:15:34 +08:00

278 lines
8.3 KiB
Go

package handlers
import (
"errors"
"net/http"
"net/url"
"github.com/router-for-me/CLIProxyAPI/v7/internal/interfaces"
"golang.org/x/net/context"
)
const (
modelExecutionMetadataSourceKey = "source"
modelExecutionInternalSource = "plugin_host_model_callback"
)
type modelExecutionOptions struct {
Headers http.Header
Query url.Values
InternalSource bool
SkipInterceptorPluginID string
SkipRouterPluginID string
}
// ModelExecutionRequest describes an internal model execution request.
type ModelExecutionRequest struct {
EntryProtocol string
ExitProtocol string
Model string
Stream bool
Body []byte
Headers http.Header
Query url.Values
Alt string
SkipInterceptorPluginID string
SkipRouterPluginID string
}
// ModelExecutionResponse describes a non-streaming internal model execution response.
type ModelExecutionResponse struct {
StatusCode int
Headers http.Header
Body []byte
}
// ModelExecutionStream describes a streaming internal model execution response.
type ModelExecutionStream struct {
StatusCode int
Headers http.Header
Chunks <-chan ModelExecutionChunk
}
// ModelExecutionChunk carries either a streaming payload or a terminal stream error.
type ModelExecutionChunk struct {
Payload []byte
Err *ModelExecutionStreamError
}
// ModelExecutionStreamError carries a JSON-friendly terminal stream error.
type ModelExecutionStreamError struct {
StatusCode int `json:"status_code"`
Message string `json:"message"`
Headers http.Header `json:"headers"`
}
// Error returns the stream error message or the HTTP status text.
func (e *ModelExecutionStreamError) Error() string {
if e == nil {
return ""
}
if e.Message != "" {
return e.Message
}
return http.StatusText(e.StatusCode)
}
// ExecuteModel executes an internal non-streaming model request.
// Host model callbacks are non-recursive for their caller: when
// skip plugin IDs are set, that plugin's interceptors and router are skipped
// for the nested model execution while other plugins may still run.
func (h *BaseAPIHandler) ExecuteModel(ctx context.Context, req ModelExecutionRequest) (ModelExecutionResponse, *interfaces.ErrorMessage) {
if req.Stream {
return ModelExecutionResponse{}, modelExecutionModeError("ExecuteModel requires Stream=false")
}
body, headers, errMsg := h.executeWithAuthManagerFormats(ctx, req.EntryProtocol, req.ExitProtocol, req.Model, cloneBytes(req.Body), req.Alt, false, modelExecutionOptions{
Headers: req.Headers,
Query: req.Query,
InternalSource: true,
SkipInterceptorPluginID: req.SkipInterceptorPluginID,
SkipRouterPluginID: req.SkipRouterPluginID,
})
if errMsg != nil {
return ModelExecutionResponse{}, errMsg
}
return ModelExecutionResponse{
StatusCode: http.StatusOK,
Headers: cloneHeader(headers),
Body: cloneBytes(body),
}, nil
}
// ExecuteModelStream executes an internal streaming model request.
// Host model callbacks are non-recursive for their caller: when
// skip plugin IDs are set, that plugin's interceptors and router are skipped
// for the nested model execution while other plugins may still run.
func (h *BaseAPIHandler) ExecuteModelStream(ctx context.Context, req ModelExecutionRequest) (ModelExecutionStream, *interfaces.ErrorMessage) {
if !req.Stream {
return ModelExecutionStream{}, modelExecutionModeError("ExecuteModelStream requires Stream=true")
}
dataChan, headers, errChan := h.executeStreamWithAuthManagerFormats(ctx, req.EntryProtocol, req.ExitProtocol, req.Model, cloneBytes(req.Body), req.Alt, false, modelExecutionOptions{
Headers: req.Headers,
Query: req.Query,
InternalSource: true,
SkipInterceptorPluginID: req.SkipInterceptorPluginID,
SkipRouterPluginID: req.SkipRouterPluginID,
})
chunks, errMsg := prepareModelExecutionStream(ctx, dataChan, errChan)
if errMsg != nil {
return ModelExecutionStream{}, errMsg
}
return ModelExecutionStream{
StatusCode: http.StatusOK,
Headers: cloneHeader(headers),
Chunks: chunks,
}, nil
}
func modelExecutionModeError(message string) *interfaces.ErrorMessage {
return &interfaces.ErrorMessage{StatusCode: http.StatusBadRequest, Error: errors.New(message)}
}
func modelExecutionResponseProtocol(entryProtocol, exitProtocol string) string {
if exitProtocol == "" {
return entryProtocol
}
return exitProtocol
}
func modelExecutionHeaders(ctx context.Context, headers http.Header) http.Header {
if len(headers) > 0 {
return cloneHeader(headers)
}
return headersFromContext(ctx)
}
// modelExecutionQuery prefers an explicitly provided query and otherwise falls
// back to the inbound query embedded in the request context. This lets model
// routers observe query parameters for plain HTTP requests even when callers
// do not populate execOptions.Query (mirrors modelExecutionHeaders).
func modelExecutionQuery(ctx context.Context, query url.Values) url.Values {
if len(query) > 0 {
return cloneURLValues(query)
}
return queryFromContext(ctx)
}
func cloneURLValues(src url.Values) url.Values {
if src == nil {
return nil
}
dst := make(url.Values, len(src))
for key, values := range src {
dst[key] = append([]string(nil), values...)
}
return dst
}
func addModelExecutionSourceMetadata(meta map[string]any, internalSource bool) {
if !internalSource || meta == nil {
return
}
meta[modelExecutionMetadataSourceKey] = modelExecutionInternalSource
}
func prepareModelExecutionStream(ctx context.Context, dataChan <-chan []byte, errChan <-chan *interfaces.ErrorMessage) (<-chan ModelExecutionChunk, *interfaces.ErrorMessage) {
pending, nextDataChan, nextErrChan, errMsg := receiveInitialModelExecutionChunk(ctx, dataChan, errChan)
if errMsg != nil {
return nil, errMsg
}
return wrapModelExecutionChunks(ctx, nextDataChan, nextErrChan, pending), nil
}
func receiveInitialModelExecutionChunk(ctx context.Context, dataChan <-chan []byte, errChan <-chan *interfaces.ErrorMessage) ([]ModelExecutionChunk, <-chan []byte, <-chan *interfaces.ErrorMessage, *interfaces.ErrorMessage) {
var done <-chan struct{}
if ctx != nil {
done = ctx.Done()
}
for dataChan != nil || errChan != nil {
select {
case payload, ok := <-dataChan:
if !ok {
dataChan = nil
continue
}
return []ModelExecutionChunk{{Payload: cloneBytes(payload)}}, dataChan, errChan, nil
case errMsg, ok := <-errChan:
if !ok {
errChan = nil
continue
}
if errMsg != nil {
return nil, dataChan, errChan, errMsg
}
case <-done:
return nil, dataChan, errChan, nil
}
}
return nil, dataChan, errChan, nil
}
func wrapModelExecutionChunks(ctx context.Context, dataChan <-chan []byte, errChan <-chan *interfaces.ErrorMessage, pending []ModelExecutionChunk) <-chan ModelExecutionChunk {
chunks := make(chan ModelExecutionChunk)
go func() {
defer close(chunks)
var done <-chan struct{}
if ctx != nil {
done = ctx.Done()
}
for _, chunk := range pending {
if !sendModelExecutionChunk(ctx, chunks, chunk) {
return
}
}
for dataChan != nil || errChan != nil {
select {
case <-done:
return
case payload, ok := <-dataChan:
if !ok {
dataChan = nil
continue
}
if !sendModelExecutionChunk(ctx, chunks, ModelExecutionChunk{Payload: cloneBytes(payload)}) {
return
}
case errMsg, ok := <-errChan:
if !ok {
errChan = nil
continue
}
if errMsg != nil {
_ = sendModelExecutionChunk(ctx, chunks, ModelExecutionChunk{Err: modelExecutionStreamErrorFromMessage(errMsg)})
return
}
}
}
}()
return chunks
}
func modelExecutionStreamErrorFromMessage(errMsg *interfaces.ErrorMessage) *ModelExecutionStreamError {
if errMsg == nil {
return nil
}
message := ""
if errMsg.Error != nil {
message = errMsg.Error.Error()
}
return &ModelExecutionStreamError{
StatusCode: errMsg.StatusCode,
Message: message,
Headers: cloneHeader(errMsg.Addon),
}
}
func sendModelExecutionChunk(ctx context.Context, chunks chan<- ModelExecutionChunk, chunk ModelExecutionChunk) bool {
if ctx == nil {
chunks <- chunk
return true
}
select {
case <-ctx.Done():
return false
case chunks <- chunk:
return true
}
}