mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-06-24 03:28:23 +08:00
* feat(plugin): add ModelRouter before auth with single-slot routing targets ## Motivation Plugins that need to change execution based on the **original inbound request** (protocol format, raw body, headers, query, stream flag, metadata, etc.) often resorted to virtual/trampoline models or routing inside interceptors. This commit adds **ModelRouter**: a pluggable layer **before** model-to-provider resolution and AuthManager credential selection, so plugins can declare who executes a request without spoofing the client model name. This is a **new capability**, not a bugfix on the existing chain. With no ModelRouter plugins loaded, behavior matches upstream. ## Pipeline placement - `execute`, `stream`, and `count` (and image paths via AuthManager) call `applyModelRouter()` before building `coreexecutor.Request`. - Routing runs **before** the request interceptor (before auth), so routers see the client’s original context. After a plugin executor is chosen, the existing **after-auth interceptor → response/stream interceptor** chain still applies. - Internal `ExecuteModel` / `ExecuteModelStream` (host callbacks) support `SkipRouterPluginID` so nested calls do not re-enter the same router. ## Routing API (single slot, mutually exclusive) `ModelRouteResponse` uses **one target slot** to avoid ambiguity when both `TargetExecutorPluginID` and `TargetProvider` were set and the host ignored one: | Field | Meaning | |-------|---------| | `Handled` | `false`: this router declines; try the next router or default path | | `TargetKind` | `self` \| `executor` \| `provider` (pick one) | | `Target` | `self`/`executor`: plugin ID; `provider`: built-in provider key | | `TargetModel` | Optional on `provider` only; empty keeps client `RequestedModel` | | `Reason` | Optional diagnostic text | - **self**: the router plugin’s own executor (`Target` normalized to the router’s plugin ID). - **executor**: another plugin’s executor; host pre-checks with `executorPluginReady()` (executor declared and provider identifier resolvable) to avoid handled routes that 500 at execution. - **provider**: skip registry model resolution; fixed built-in AuthManager path; optional `TargetModel` for execution model only—**does not** change outward requested-model metadata. Routers run in **descending plugin priority** (tie-break: ascending plugin ID). Panic, error, invalid target, or unavailable executor/provider → log and **fall through to the next router**; if none handle, use the original provider+auth flow. ## Context exposed to routers `ModelRouteRequest` includes: - `SourceFormat`, `RequestedModel`, `Stream` - `Headers`, `Query`, `Body` (defensive copies) - `Metadata` (best-effort read-only context snapshot) - `AvailableProviders`: built-in provider keys with at least one **non-disabled** auth (`AuthManager.AvailableProviders()`). **Does not** reflect per-model cooldown or transient unavailability—treat as an optimistic snapshot. Adds `AuthManager.HasProviderAuth()` and `AvailableProviders()`, excluding `Disabled` and `StatusDisabled` auths consistently with credential selection. ## Host and RPC - Go plugins: `pluginapi.ModelRouter` + `RouteModel()`. - RPC plugins: `pluginabi.MethodModelRoute` (`model.route`), capability flag `model_router`. - `pluginhost.Host` implements `RouteModel` / `RouteModelExcept`; handlers use `SetModelRouterHost` or a `PluginHost` type assertion; **direct executor** paths use `ExecutePluginExecutor*` / `CountPluginExecutor`. - No bundled example ModelRouter plugin; capability is active only when a third-party plugin declares `model_router` and loads. ## Plugin RPC schema (policy A, upstream-aligned) - `pluginabi.SchemaVersion` stays **1**: capability additions (`model_router`, `model.route`) do not bump the number; increment only on breaking RPC JSON changes. - Host sends `schema_version` at register; reject only if the plugin declares a **higher** version than the host. - No unpublished “ModelRouter requires schema ≥ 3” gate (v3 single-slot API was never public). - Existing plugins and examples without `model_router` (`schema_version: 1`) need no changes. - RPC ModelRouter: `schema_version: 1` + `model_router: true` + implement `model.route`. ## Path consistency within this commit - Provider routes reuse image-only model checks (e.g. `gpt-image-2`) on the normalized model, same as the default AuthManager path. - `count` aligned with execute/stream: `SkipRouterPluginID`, query/headers injection, interceptor skip semantics. - Handlers: `modelRoutersEnabled` treats hosts without `HasModelRouters` as disabled (same as before ModelRouter existed); `pluginhost.Host` implements the detector. - API docs: `ModelRouter` explicitly includes built-in **provider** targets (in addition to plugin executors and the router’s own executor). ## Testing go test ./internal/pluginhost ./sdk/api/handlers ./sdk/pluginapi ./sdk/pluginabi ./sdk/cliproxy/auth go build -o test-output ./cmd/server && rm test-output go test ./... * fix(handlers): address ModelRouter review feedback - Use modelExecutionQuery for plugin executor and AuthManager paths so inbound URL query matches router/header behavior - Guard queryFromContext when gin Request.URL is nil - Read plugin executor stream chunks via nextStreamChunk to exit on cancel - Drop redundant clonePluginMetadata on capability record meta Tests cover query propagation, stream cancel, and nil URL safety. * feat(plugin): add Claude web search router example Add a Claude Code web_search ModelRouter example that can route matching Claude requests through Antigravity, Codex, xAI, or Tavily. The plugin includes executor orchestration, backend fallback/penalty handling, Tavily API key support, Claude-compatible response assembly, stream forwarding, and focused unit coverage for detection, fallback routing, model resolution, penalties, stream forwarding, and Tavily behavior. Verification: go test -count=1 ./... in examples/plugin/claude-web-search-router/go; go build -buildmode=c-shared for the plugin; go build ./cmd/server; live local CPA curl coverage for plugin load, four explicit routes, fallback, and Codex spark routing. * fix(pluginhost): validate executor routes before fallback * fix(pluginhost): skip oauth-only executor routes
278 lines
8.3 KiB
Go
278 lines
8.3 KiB
Go
package handlers
|
|
|
|
import (
|
|
"errors"
|
|
"net/http"
|
|
"net/url"
|
|
|
|
"github.com/router-for-me/CLIProxyAPI/v7/internal/interfaces"
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
const (
|
|
modelExecutionMetadataSourceKey = "source"
|
|
modelExecutionInternalSource = "plugin_host_model_callback"
|
|
)
|
|
|
|
type modelExecutionOptions struct {
|
|
Headers http.Header
|
|
Query url.Values
|
|
InternalSource bool
|
|
SkipInterceptorPluginID string
|
|
SkipRouterPluginID string
|
|
}
|
|
|
|
// ModelExecutionRequest describes an internal model execution request.
|
|
type ModelExecutionRequest struct {
|
|
EntryProtocol string
|
|
ExitProtocol string
|
|
Model string
|
|
Stream bool
|
|
Body []byte
|
|
Headers http.Header
|
|
Query url.Values
|
|
Alt string
|
|
SkipInterceptorPluginID string
|
|
SkipRouterPluginID string
|
|
}
|
|
|
|
// ModelExecutionResponse describes a non-streaming internal model execution response.
|
|
type ModelExecutionResponse struct {
|
|
StatusCode int
|
|
Headers http.Header
|
|
Body []byte
|
|
}
|
|
|
|
// ModelExecutionStream describes a streaming internal model execution response.
|
|
type ModelExecutionStream struct {
|
|
StatusCode int
|
|
Headers http.Header
|
|
Chunks <-chan ModelExecutionChunk
|
|
}
|
|
|
|
// ModelExecutionChunk carries either a streaming payload or a terminal stream error.
|
|
type ModelExecutionChunk struct {
|
|
Payload []byte
|
|
Err *ModelExecutionStreamError
|
|
}
|
|
|
|
// ModelExecutionStreamError carries a JSON-friendly terminal stream error.
|
|
type ModelExecutionStreamError struct {
|
|
StatusCode int `json:"status_code"`
|
|
Message string `json:"message"`
|
|
Headers http.Header `json:"headers"`
|
|
}
|
|
|
|
// Error returns the stream error message or the HTTP status text.
|
|
func (e *ModelExecutionStreamError) Error() string {
|
|
if e == nil {
|
|
return ""
|
|
}
|
|
if e.Message != "" {
|
|
return e.Message
|
|
}
|
|
return http.StatusText(e.StatusCode)
|
|
}
|
|
|
|
// ExecuteModel executes an internal non-streaming model request.
|
|
// Host model callbacks are non-recursive for their caller: when
|
|
// skip plugin IDs are set, that plugin's interceptors and router are skipped
|
|
// for the nested model execution while other plugins may still run.
|
|
func (h *BaseAPIHandler) ExecuteModel(ctx context.Context, req ModelExecutionRequest) (ModelExecutionResponse, *interfaces.ErrorMessage) {
|
|
if req.Stream {
|
|
return ModelExecutionResponse{}, modelExecutionModeError("ExecuteModel requires Stream=false")
|
|
}
|
|
body, headers, errMsg := h.executeWithAuthManagerFormats(ctx, req.EntryProtocol, req.ExitProtocol, req.Model, cloneBytes(req.Body), req.Alt, false, modelExecutionOptions{
|
|
Headers: req.Headers,
|
|
Query: req.Query,
|
|
InternalSource: true,
|
|
SkipInterceptorPluginID: req.SkipInterceptorPluginID,
|
|
SkipRouterPluginID: req.SkipRouterPluginID,
|
|
})
|
|
if errMsg != nil {
|
|
return ModelExecutionResponse{}, errMsg
|
|
}
|
|
return ModelExecutionResponse{
|
|
StatusCode: http.StatusOK,
|
|
Headers: cloneHeader(headers),
|
|
Body: cloneBytes(body),
|
|
}, nil
|
|
}
|
|
|
|
// ExecuteModelStream executes an internal streaming model request.
|
|
// Host model callbacks are non-recursive for their caller: when
|
|
// skip plugin IDs are set, that plugin's interceptors and router are skipped
|
|
// for the nested model execution while other plugins may still run.
|
|
func (h *BaseAPIHandler) ExecuteModelStream(ctx context.Context, req ModelExecutionRequest) (ModelExecutionStream, *interfaces.ErrorMessage) {
|
|
if !req.Stream {
|
|
return ModelExecutionStream{}, modelExecutionModeError("ExecuteModelStream requires Stream=true")
|
|
}
|
|
dataChan, headers, errChan := h.executeStreamWithAuthManagerFormats(ctx, req.EntryProtocol, req.ExitProtocol, req.Model, cloneBytes(req.Body), req.Alt, false, modelExecutionOptions{
|
|
Headers: req.Headers,
|
|
Query: req.Query,
|
|
InternalSource: true,
|
|
SkipInterceptorPluginID: req.SkipInterceptorPluginID,
|
|
SkipRouterPluginID: req.SkipRouterPluginID,
|
|
})
|
|
chunks, errMsg := prepareModelExecutionStream(ctx, dataChan, errChan)
|
|
if errMsg != nil {
|
|
return ModelExecutionStream{}, errMsg
|
|
}
|
|
return ModelExecutionStream{
|
|
StatusCode: http.StatusOK,
|
|
Headers: cloneHeader(headers),
|
|
Chunks: chunks,
|
|
}, nil
|
|
}
|
|
|
|
func modelExecutionModeError(message string) *interfaces.ErrorMessage {
|
|
return &interfaces.ErrorMessage{StatusCode: http.StatusBadRequest, Error: errors.New(message)}
|
|
}
|
|
|
|
func modelExecutionResponseProtocol(entryProtocol, exitProtocol string) string {
|
|
if exitProtocol == "" {
|
|
return entryProtocol
|
|
}
|
|
return exitProtocol
|
|
}
|
|
|
|
func modelExecutionHeaders(ctx context.Context, headers http.Header) http.Header {
|
|
if len(headers) > 0 {
|
|
return cloneHeader(headers)
|
|
}
|
|
return headersFromContext(ctx)
|
|
}
|
|
|
|
// modelExecutionQuery prefers an explicitly provided query and otherwise falls
|
|
// back to the inbound query embedded in the request context. This lets model
|
|
// routers observe query parameters for plain HTTP requests even when callers
|
|
// do not populate execOptions.Query (mirrors modelExecutionHeaders).
|
|
func modelExecutionQuery(ctx context.Context, query url.Values) url.Values {
|
|
if len(query) > 0 {
|
|
return cloneURLValues(query)
|
|
}
|
|
return queryFromContext(ctx)
|
|
}
|
|
|
|
func cloneURLValues(src url.Values) url.Values {
|
|
if src == nil {
|
|
return nil
|
|
}
|
|
dst := make(url.Values, len(src))
|
|
for key, values := range src {
|
|
dst[key] = append([]string(nil), values...)
|
|
}
|
|
return dst
|
|
}
|
|
|
|
func addModelExecutionSourceMetadata(meta map[string]any, internalSource bool) {
|
|
if !internalSource || meta == nil {
|
|
return
|
|
}
|
|
meta[modelExecutionMetadataSourceKey] = modelExecutionInternalSource
|
|
}
|
|
|
|
func prepareModelExecutionStream(ctx context.Context, dataChan <-chan []byte, errChan <-chan *interfaces.ErrorMessage) (<-chan ModelExecutionChunk, *interfaces.ErrorMessage) {
|
|
pending, nextDataChan, nextErrChan, errMsg := receiveInitialModelExecutionChunk(ctx, dataChan, errChan)
|
|
if errMsg != nil {
|
|
return nil, errMsg
|
|
}
|
|
return wrapModelExecutionChunks(ctx, nextDataChan, nextErrChan, pending), nil
|
|
}
|
|
|
|
func receiveInitialModelExecutionChunk(ctx context.Context, dataChan <-chan []byte, errChan <-chan *interfaces.ErrorMessage) ([]ModelExecutionChunk, <-chan []byte, <-chan *interfaces.ErrorMessage, *interfaces.ErrorMessage) {
|
|
var done <-chan struct{}
|
|
if ctx != nil {
|
|
done = ctx.Done()
|
|
}
|
|
for dataChan != nil || errChan != nil {
|
|
select {
|
|
case payload, ok := <-dataChan:
|
|
if !ok {
|
|
dataChan = nil
|
|
continue
|
|
}
|
|
return []ModelExecutionChunk{{Payload: cloneBytes(payload)}}, dataChan, errChan, nil
|
|
case errMsg, ok := <-errChan:
|
|
if !ok {
|
|
errChan = nil
|
|
continue
|
|
}
|
|
if errMsg != nil {
|
|
return nil, dataChan, errChan, errMsg
|
|
}
|
|
case <-done:
|
|
return nil, dataChan, errChan, nil
|
|
}
|
|
}
|
|
return nil, dataChan, errChan, nil
|
|
}
|
|
|
|
func wrapModelExecutionChunks(ctx context.Context, dataChan <-chan []byte, errChan <-chan *interfaces.ErrorMessage, pending []ModelExecutionChunk) <-chan ModelExecutionChunk {
|
|
chunks := make(chan ModelExecutionChunk)
|
|
go func() {
|
|
defer close(chunks)
|
|
var done <-chan struct{}
|
|
if ctx != nil {
|
|
done = ctx.Done()
|
|
}
|
|
for _, chunk := range pending {
|
|
if !sendModelExecutionChunk(ctx, chunks, chunk) {
|
|
return
|
|
}
|
|
}
|
|
for dataChan != nil || errChan != nil {
|
|
select {
|
|
case <-done:
|
|
return
|
|
case payload, ok := <-dataChan:
|
|
if !ok {
|
|
dataChan = nil
|
|
continue
|
|
}
|
|
if !sendModelExecutionChunk(ctx, chunks, ModelExecutionChunk{Payload: cloneBytes(payload)}) {
|
|
return
|
|
}
|
|
case errMsg, ok := <-errChan:
|
|
if !ok {
|
|
errChan = nil
|
|
continue
|
|
}
|
|
if errMsg != nil {
|
|
_ = sendModelExecutionChunk(ctx, chunks, ModelExecutionChunk{Err: modelExecutionStreamErrorFromMessage(errMsg)})
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return chunks
|
|
}
|
|
|
|
func modelExecutionStreamErrorFromMessage(errMsg *interfaces.ErrorMessage) *ModelExecutionStreamError {
|
|
if errMsg == nil {
|
|
return nil
|
|
}
|
|
message := ""
|
|
if errMsg.Error != nil {
|
|
message = errMsg.Error.Error()
|
|
}
|
|
return &ModelExecutionStreamError{
|
|
StatusCode: errMsg.StatusCode,
|
|
Message: message,
|
|
Headers: cloneHeader(errMsg.Addon),
|
|
}
|
|
}
|
|
|
|
func sendModelExecutionChunk(ctx context.Context, chunks chan<- ModelExecutionChunk, chunk ModelExecutionChunk) bool {
|
|
if ctx == nil {
|
|
chunks <- chunk
|
|
return true
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return false
|
|
case chunks <- chunk:
|
|
return true
|
|
}
|
|
}
|