Files
CLIProxyAPI/internal/pluginhost/host.go
sususu98 87132e54d7 feat(plugin): add ModelRouter before auth with single-slot routing targets (#3865)
* feat(plugin): add ModelRouter before auth with single-slot routing targets

## Motivation

Plugins that need to change execution based on the **original inbound request**
(protocol format, raw body, headers, query, stream flag, metadata, etc.) often
resorted to virtual/trampoline models or routing inside interceptors. This
commit adds **ModelRouter**: a pluggable layer **before** model-to-provider
resolution and AuthManager credential selection, so plugins can declare who
executes a request without spoofing the client model name.

This is a **new capability**, not a bugfix on the existing chain. With no
ModelRouter plugins loaded, behavior matches upstream.

## Pipeline placement

- `execute`, `stream`, and `count` (and image paths via AuthManager) call
  `applyModelRouter()` before building `coreexecutor.Request`.
- Routing runs **before** the request interceptor (before auth), so routers see
  the client’s original context. After a plugin executor is chosen, the existing
  **after-auth interceptor → response/stream interceptor** chain still applies.
- Internal `ExecuteModel` / `ExecuteModelStream` (host callbacks) support
  `SkipRouterPluginID` so nested calls do not re-enter the same router.

## Routing API (single slot, mutually exclusive)

`ModelRouteResponse` uses **one target slot** to avoid ambiguity when both
`TargetExecutorPluginID` and `TargetProvider` were set and the host ignored one:

| Field | Meaning |
|-------|---------|
| `Handled` | `false`: this router declines; try the next router or default path |
| `TargetKind` | `self` \| `executor` \| `provider` (pick one) |
| `Target` | `self`/`executor`: plugin ID; `provider`: built-in provider key |
| `TargetModel` | Optional on `provider` only; empty keeps client `RequestedModel` |
| `Reason` | Optional diagnostic text |

- **self**: the router plugin’s own executor (`Target` normalized to the router’s plugin ID).
- **executor**: another plugin’s executor; host pre-checks with `executorPluginReady()`
  (executor declared and provider identifier resolvable) to avoid handled routes that 500 at execution.
- **provider**: skip registry model resolution; fixed built-in AuthManager path; optional
  `TargetModel` for execution model only—**does not** change outward requested-model metadata.

Routers run in **descending plugin priority** (tie-break: ascending plugin ID). Panic, error,
invalid target, or unavailable executor/provider → log and **fall through to the next router**;
if none handle, use the original provider+auth flow.

## Context exposed to routers

`ModelRouteRequest` includes:

- `SourceFormat`, `RequestedModel`, `Stream`
- `Headers`, `Query`, `Body` (defensive copies)
- `Metadata` (best-effort read-only context snapshot)
- `AvailableProviders`: built-in provider keys with at least one **non-disabled** auth
  (`AuthManager.AvailableProviders()`). **Does not** reflect per-model cooldown or transient
  unavailability—treat as an optimistic snapshot.

Adds `AuthManager.HasProviderAuth()` and `AvailableProviders()`, excluding `Disabled` and
`StatusDisabled` auths consistently with credential selection.

## Host and RPC

- Go plugins: `pluginapi.ModelRouter` + `RouteModel()`.
- RPC plugins: `pluginabi.MethodModelRoute` (`model.route`), capability flag `model_router`.
- `pluginhost.Host` implements `RouteModel` / `RouteModelExcept`; handlers use
  `SetModelRouterHost` or a `PluginHost` type assertion; **direct executor** paths use
  `ExecutePluginExecutor*` / `CountPluginExecutor`.
- No bundled example ModelRouter plugin; capability is active only when a third-party plugin
  declares `model_router` and loads.

## Plugin RPC schema (policy A, upstream-aligned)

- `pluginabi.SchemaVersion` stays **1**: capability additions (`model_router`, `model.route`)
  do not bump the number; increment only on breaking RPC JSON changes.
- Host sends `schema_version` at register; reject only if the plugin declares a **higher**
  version than the host.
- No unpublished “ModelRouter requires schema ≥ 3” gate (v3 single-slot API was never public).
- Existing plugins and examples without `model_router` (`schema_version: 1`) need no changes.
- RPC ModelRouter: `schema_version: 1` + `model_router: true` + implement `model.route`.

## Path consistency within this commit

- Provider routes reuse image-only model checks (e.g. `gpt-image-2`) on the normalized model,
  same as the default AuthManager path.
- `count` aligned with execute/stream: `SkipRouterPluginID`, query/headers injection,
  interceptor skip semantics.
- Handlers: `modelRoutersEnabled` treats hosts without `HasModelRouters` as disabled
  (same as before ModelRouter existed); `pluginhost.Host` implements the detector.
- API docs: `ModelRouter` explicitly includes built-in **provider** targets (in addition to
  plugin executors and the router’s own executor).

## Testing

go test ./internal/pluginhost ./sdk/api/handlers ./sdk/pluginapi ./sdk/pluginabi ./sdk/cliproxy/auth
go build -o test-output ./cmd/server && rm test-output
go test ./...

* fix(handlers): address ModelRouter review feedback

- Use modelExecutionQuery for plugin executor and AuthManager paths so
  inbound URL query matches router/header behavior
- Guard queryFromContext when gin Request.URL is nil
- Read plugin executor stream chunks via nextStreamChunk to exit on cancel
- Drop redundant clonePluginMetadata on capability record meta

Tests cover query propagation, stream cancel, and nil URL safety.

* feat(plugin): add Claude web search router example

Add a Claude Code web_search ModelRouter example that can route matching Claude requests through Antigravity, Codex, xAI, or Tavily.

The plugin includes executor orchestration, backend fallback/penalty handling, Tavily API key support, Claude-compatible response assembly, stream forwarding, and focused unit coverage for detection, fallback routing, model resolution, penalties, stream forwarding, and Tavily behavior.

Verification: go test -count=1 ./... in examples/plugin/claude-web-search-router/go; go build -buildmode=c-shared for the plugin; go build ./cmd/server; live local CPA curl coverage for plugin load, four explicit routes, fallback, and Codex spark routing.

* fix(pluginhost): validate executor routes before fallback

* fix(pluginhost): skip oauth-only executor routes
2026-06-16 19:15:34 +08:00

436 lines
12 KiB
Go

package pluginhost
import (
"context"
"fmt"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"github.com/router-for-me/CLIProxyAPI/v7/internal/config"
"github.com/router-for-me/CLIProxyAPI/v7/internal/interfaces"
"github.com/router-for-me/CLIProxyAPI/v7/sdk/api/handlers"
coreauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth"
"github.com/router-for-me/CLIProxyAPI/v7/sdk/pluginabi"
"github.com/router-for-me/CLIProxyAPI/v7/sdk/pluginapi"
log "github.com/sirupsen/logrus"
)
type loadedPlugin struct {
id string
path string
registered bool
client pluginClient
}
type modelExecutor interface {
ExecuteModel(context.Context, handlers.ModelExecutionRequest) (handlers.ModelExecutionResponse, *interfaces.ErrorMessage)
ExecuteModelStream(context.Context, handlers.ModelExecutionRequest) (handlers.ModelExecutionStream, *interfaces.ErrorMessage)
}
type pluginUnloadTarget struct {
id string
path string
client pluginClient
}
type Host struct {
mu sync.Mutex
loader pluginLoader
loaded map[string]*loadedPlugin
fused map[string]string
runtimeConfig *config.Config
authManager *coreauth.Manager
modelExecutor modelExecutor
modelClientIDs map[string]struct{}
executorModelClientIDs map[string]struct{}
modelProviders map[string]string
modelRegistrations map[string]pluginModelRegistration
providerModels map[string][]*registryModelInfo
executorProviders map[string]struct{}
accessProviderKeys map[string]struct{}
commandLineFlags map[string]commandLineFlagRecord
commandLineHits map[string]struct{}
managementRoutes map[string]managementRouteRecord
resourceRoutes map[string]resourceRouteRecord
streams *streamBridge
httpStreams *hostHTTPStreamBridge
modelStreams *modelStreamBridge
callbackContexts *callbackContextRegistry
snapshot atomic.Value
}
func New() *Host {
h := &Host{
loader: defaultPluginLoader(),
loaded: make(map[string]*loadedPlugin),
fused: make(map[string]string),
modelClientIDs: make(map[string]struct{}),
executorModelClientIDs: make(map[string]struct{}),
modelProviders: make(map[string]string),
modelRegistrations: make(map[string]pluginModelRegistration),
providerModels: make(map[string][]*registryModelInfo),
executorProviders: make(map[string]struct{}),
accessProviderKeys: make(map[string]struct{}),
commandLineFlags: make(map[string]commandLineFlagRecord),
commandLineHits: make(map[string]struct{}),
managementRoutes: make(map[string]managementRouteRecord),
resourceRoutes: make(map[string]resourceRouteRecord),
streams: newStreamBridge(),
httpStreams: newHostHTTPStreamBridge(),
modelStreams: newModelStreamBridge(),
callbackContexts: newCallbackContextRegistry(),
}
h.snapshot.Store(emptySnapshot())
return h
}
func NewForTest(loader pluginLoader) *Host {
h := New()
h.loader = loader
return h
}
func (h *Host) SetModelExecutor(executor modelExecutor) {
if h == nil {
return
}
h.mu.Lock()
h.modelExecutor = executor
h.mu.Unlock()
}
func (h *Host) currentModelExecutor() modelExecutor {
if h == nil {
return nil
}
h.mu.Lock()
executor := h.modelExecutor
h.mu.Unlock()
return executor
}
func (h *Host) Snapshot() *Snapshot {
if h == nil {
return emptySnapshot()
}
raw := h.snapshot.Load()
if snap, ok := raw.(*Snapshot); ok && snap != nil {
return snap
}
return emptySnapshot()
}
// PluginLoaded reports whether a plugin dynamic library is still loaded by the host.
func (h *Host) PluginLoaded(id string) bool {
if h == nil {
return false
}
id = strings.TrimSpace(id)
if id == "" {
return false
}
h.mu.Lock()
defer h.mu.Unlock()
_, ok := h.loaded[id]
return ok
}
func (h *Host) ApplyConfig(ctx context.Context, cfg *config.Config) {
if h == nil {
return
}
rc := runtimeConfigFromConfig(cfg)
h.mu.Lock()
h.runtimeConfig = cfg
if !rc.Enabled {
h.managementRoutes = make(map[string]managementRouteRecord)
h.resourceRoutes = make(map[string]resourceRouteRecord)
h.snapshot.Store(emptySnapshot())
h.mu.Unlock()
h.refreshThinkingProviders(nil)
return
}
files, errSelect := selectPluginFiles(rc.Dir)
if errSelect != nil {
log.Warnf("pluginhost: failed to select plugin files: %v", errSelect)
h.managementRoutes = make(map[string]managementRouteRecord)
h.resourceRoutes = make(map[string]resourceRouteRecord)
h.snapshot.Store(emptySnapshot())
h.mu.Unlock()
h.refreshThinkingProviders(nil)
return
}
records := make([]capabilityRecord, 0, len(files))
for _, file := range files {
item, ok := rc.Items[file.ID]
if !ok {
item = defaultRuntimeItemConfig(file.ID)
}
if !item.Enabled {
continue
}
if _, disabled := h.fused[file.ID]; disabled {
continue
}
lp := h.loaded[file.ID]
if lp == nil {
loaded, errLoad := h.loadLocked(file)
if errLoad != nil {
log.Warnf("pluginhost: failed to load plugin %s from %s: %v", file.ID, file.Path, errLoad)
continue
}
lp = loaded
h.loaded[file.ID] = lp
log.WithFields(log.Fields{
"plugin_id": file.ID,
"path": file.Path,
}).Info("pluginhost: plugin loaded")
}
plugin, okCall := h.callRegisterLocked(ctx, lp, item)
if !okCall {
continue
}
plugin.Metadata = clonePluginMetadata(plugin.Metadata)
records = append(records, capabilityRecord{
id: file.ID,
priority: item.Priority,
meta: plugin.Metadata,
plugin: plugin,
})
}
sortRecords(records)
h.snapshot.Store(&Snapshot{enabled: true, records: records})
h.mu.Unlock()
h.refreshThinkingProviders(records)
}
func (h *Host) loadLocked(file pluginFile) (*loadedPlugin, error) {
client, errOpen := h.loader.Open(file, h)
if errOpen != nil {
return nil, errOpen
}
return &loadedPlugin{
id: file.ID,
path: file.Path,
client: newGuardedPluginClient(client),
}, nil
}
// UnloadPlugin removes one plugin from the active runtime and closes its dynamic library.
func (h *Host) UnloadPlugin(id string) bool {
if h == nil {
return false
}
id = strings.TrimSpace(id)
if id == "" {
return false
}
var target pluginUnloadTarget
h.mu.Lock()
lp := h.loaded[id]
if lp == nil {
h.mu.Unlock()
return false
}
target = pluginUnloadTarget{id: lp.id, path: lp.path, client: lp.client}
delete(h.loaded, id)
delete(h.fused, id)
records, enabled := h.snapshotWithoutPluginLocked(id)
h.removePluginRuntimeStateLocked(id)
h.snapshot.Store(&Snapshot{enabled: enabled, records: records})
h.mu.Unlock()
h.refreshThinkingProviders(records)
h.RegisterFrontendAuthProviders()
if target.client != nil {
target.client.Shutdown()
}
log.WithFields(log.Fields{
"plugin_id": target.id,
"path": target.path,
}).Info("pluginhost: plugin unloaded")
return true
}
// ShutdownAll removes active plugin capabilities and closes all loaded dynamic libraries.
func (h *Host) ShutdownAll() {
if h == nil {
return
}
targets := make([]pluginUnloadTarget, 0)
h.mu.Lock()
for _, lp := range h.loaded {
if lp == nil || lp.client == nil {
continue
}
targets = append(targets, pluginUnloadTarget{
id: lp.id,
path: lp.path,
client: lp.client,
})
}
h.loaded = make(map[string]*loadedPlugin)
h.modelClientIDs = make(map[string]struct{})
h.executorModelClientIDs = make(map[string]struct{})
h.modelProviders = make(map[string]string)
h.modelRegistrations = make(map[string]pluginModelRegistration)
h.providerModels = make(map[string][]*registryModelInfo)
h.executorProviders = make(map[string]struct{})
h.commandLineFlags = make(map[string]commandLineFlagRecord)
h.commandLineHits = make(map[string]struct{})
h.managementRoutes = make(map[string]managementRouteRecord)
h.resourceRoutes = make(map[string]resourceRouteRecord)
h.snapshot.Store(emptySnapshot())
h.mu.Unlock()
h.refreshThinkingProviders(nil)
h.RegisterFrontendAuthProviders()
for _, target := range targets {
target.client.Shutdown()
log.WithFields(log.Fields{
"plugin_id": target.id,
"path": target.path,
}).Info("pluginhost: plugin unloaded")
}
}
func (h *Host) snapshotWithoutPluginLocked(id string) ([]capabilityRecord, bool) {
raw := h.snapshot.Load()
snap, _ := raw.(*Snapshot)
if snap == nil || len(snap.records) == 0 {
return nil, snap != nil && snap.enabled
}
records := make([]capabilityRecord, 0, len(snap.records))
for _, record := range snap.records {
if record.id == id {
continue
}
records = append(records, record)
}
return records, snap.enabled
}
func (h *Host) removePluginRuntimeStateLocked(id string) {
for key, record := range h.managementRoutes {
if record.pluginID == id {
delete(h.managementRoutes, key)
}
}
for key, record := range h.resourceRoutes {
if record.pluginID == id {
delete(h.resourceRoutes, key)
}
}
for name, record := range h.commandLineFlags {
if record.pluginID == id {
delete(h.commandLineFlags, name)
delete(h.commandLineHits, name)
}
}
if registration, ok := h.modelRegistrations[id]; ok {
delete(h.providerModels, registration.provider)
}
delete(h.modelProviders, id)
delete(h.modelRegistrations, id)
}
func (h *Host) callRegisterLocked(ctx context.Context, lp *loadedPlugin, item runtimeItemConfig) (pluginapi.Plugin, bool) {
if lp == nil {
return pluginapi.Plugin{}, false
}
method := pluginabi.MethodPluginRegister
if lp.registered {
method = pluginabi.MethodPluginReconfigure
}
plugin, okCall := h.safePluginCallLocked(ctx, lp.id, method, func() pluginapi.Plugin {
plugin, errRegister := registerRPCPlugin(ctx, h, lp.id, lp.client, method, item.ConfigYAML)
if errRegister != nil {
log.Warnf("pluginhost: plugin %s %s failed: %v", lp.id, method, errRegister)
return pluginapi.Plugin{}
}
return plugin
})
if !okCall {
return pluginapi.Plugin{}, false
}
lp.registered = true
if !validPlugin(plugin) {
log.Warnf("pluginhost: plugin %s returned invalid metadata or no capabilities", lp.id)
return pluginapi.Plugin{}, false
}
return plugin, true
}
func (h *Host) safePluginCallLocked(ctx context.Context, id, method string, fn func() pluginapi.Plugin) (out pluginapi.Plugin, ok bool) {
defer func() {
if recovered := recover(); recovered != nil {
h.fused[id] = fmt.Sprintf("%s panic: %v", method, recovered)
log.WithField("plugin_id", id).WithField("method", method).Errorf("pluginhost: plugin panic recovered: %v\n%s", recovered, debug.Stack())
out = pluginapi.Plugin{}
ok = false
}
}()
if ctx != nil {
select {
case <-ctx.Done():
return pluginapi.Plugin{}, false
default:
}
}
return fn(), true
}
func validPlugin(plugin pluginapi.Plugin) bool {
if strings.TrimSpace(plugin.Metadata.Name) == "" {
return false
}
if strings.TrimSpace(plugin.Metadata.Version) == "" {
return false
}
if strings.TrimSpace(plugin.Metadata.Author) == "" {
return false
}
if strings.TrimSpace(plugin.Metadata.GitHubRepository) == "" {
return false
}
caps := plugin.Capabilities
return caps.ModelRegistrar != nil ||
caps.ModelProvider != nil ||
caps.AuthProvider != nil ||
caps.FrontendAuthProvider != nil ||
caps.Scheduler != nil ||
caps.ModelRouter != nil ||
caps.Executor != nil ||
caps.RequestTranslator != nil ||
caps.RequestNormalizer != nil ||
caps.RequestInterceptor != nil ||
caps.ResponseTranslator != nil ||
caps.ResponseBeforeTranslator != nil ||
caps.ResponseAfterTranslator != nil ||
caps.ResponseInterceptor != nil ||
caps.StreamChunkInterceptor != nil ||
caps.ThinkingApplier != nil ||
caps.UsagePlugin != nil ||
caps.CommandLinePlugin != nil ||
caps.ManagementAPI != nil
}
func typeName(v any) string {
return fmt.Sprintf("%T", v)
}