mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-06-11 16:53:43 +08:00
- Introduced `HasStreamResponseTransformer` and `HasNonStreamResponseTransformer` to handle streaming and non-streaming transformations. - Updated `executorResponseTranslatorExists` logic to correctly validate stream-specific transformers. - Enhanced `TranslateStream` to suppress raw fallback when registered native transformers return empty output. - Added comprehensive tests (`TestHasResponseTransformerChecksConcreteResponseKinds`, `TestHasResponseTransformerIgnoresEmptyRegistration`) for stream and non-stream transformer validation.
2300 lines
72 KiB
Go
2300 lines
72 KiB
Go
package pluginhost
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"reflect"
|
|
"runtime/debug"
|
|
"sort"
|
|
"strings"
|
|
|
|
"github.com/router-for-me/CLIProxyAPI/v7/internal/registry"
|
|
"github.com/router-for-me/CLIProxyAPI/v7/internal/thinking"
|
|
sdkaccess "github.com/router-for-me/CLIProxyAPI/v7/sdk/access"
|
|
coreauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth"
|
|
coreexecutor "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/executor"
|
|
coreusage "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/usage"
|
|
"github.com/router-for-me/CLIProxyAPI/v7/sdk/pluginapi"
|
|
sdktranslator "github.com/router-for-me/CLIProxyAPI/v7/sdk/translator"
|
|
_ "github.com/router-for-me/CLIProxyAPI/v7/sdk/translator/builtin"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type registryModelInfo = registry.ModelInfo
|
|
|
|
type modelRegistry interface {
|
|
RegisterClient(clientID, clientProvider string, models []*registry.ModelInfo)
|
|
UnregisterClient(clientID string)
|
|
}
|
|
|
|
type modelProviderRegistry interface {
|
|
modelRegistry
|
|
GetModelProviders(modelID string) []string
|
|
}
|
|
|
|
type pluginModelRegistration struct {
|
|
pluginID string
|
|
provider string
|
|
priority int
|
|
models []*registry.ModelInfo
|
|
hasExecutor bool
|
|
}
|
|
|
|
func normalizedExecutorModelScope(caps pluginapi.Capabilities) pluginapi.ExecutorModelScope {
|
|
if caps.Executor == nil {
|
|
return pluginapi.ExecutorModelScopeBoth
|
|
}
|
|
switch caps.ExecutorModelScope {
|
|
case pluginapi.ExecutorModelScopeStatic, pluginapi.ExecutorModelScopeOAuth, pluginapi.ExecutorModelScopeBoth:
|
|
return caps.ExecutorModelScope
|
|
default:
|
|
return pluginapi.ExecutorModelScopeBoth
|
|
}
|
|
}
|
|
|
|
func executorScopeAllowsStaticModels(caps pluginapi.Capabilities) bool {
|
|
if caps.Executor == nil {
|
|
return true
|
|
}
|
|
scope := normalizedExecutorModelScope(caps)
|
|
return scope == pluginapi.ExecutorModelScopeStatic || scope == pluginapi.ExecutorModelScopeBoth
|
|
}
|
|
|
|
func executorScopeAllowsOAuthModels(caps pluginapi.Capabilities) bool {
|
|
if caps.Executor == nil {
|
|
return true
|
|
}
|
|
scope := normalizedExecutorModelScope(caps)
|
|
return scope == pluginapi.ExecutorModelScopeOAuth || scope == pluginapi.ExecutorModelScopeBoth
|
|
}
|
|
|
|
func normalizeExecutorFormats(raw []string) []sdktranslator.Format {
|
|
if len(raw) == 0 {
|
|
return nil
|
|
}
|
|
out := make([]sdktranslator.Format, 0, len(raw))
|
|
seen := make(map[string]struct{}, len(raw))
|
|
for _, item := range raw {
|
|
format := normalizeExecutorFormatName(item)
|
|
if format == "" {
|
|
continue
|
|
}
|
|
key := format.String()
|
|
if _, exists := seen[key]; exists {
|
|
continue
|
|
}
|
|
seen[key] = struct{}{}
|
|
out = append(out, format)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func normalizeExecutorFormatName(raw string) sdktranslator.Format {
|
|
switch strings.ToLower(strings.TrimSpace(raw)) {
|
|
case "", "none":
|
|
return ""
|
|
case "chat-completions", "chat_completions", "openai-chat-completions", "openai_chat_completions":
|
|
return sdktranslator.FormatOpenAI
|
|
case "responses", "openai-responses", "openai_responses":
|
|
return sdktranslator.FormatOpenAIResponse
|
|
case "anthropic":
|
|
return sdktranslator.FormatClaude
|
|
default:
|
|
return sdktranslator.FromString(strings.TrimSpace(raw))
|
|
}
|
|
}
|
|
|
|
func executorFormatContains(formats []sdktranslator.Format, target sdktranslator.Format) bool {
|
|
if target == "" {
|
|
return false
|
|
}
|
|
for _, format := range formats {
|
|
if format == target {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
type AuthModelResult struct {
|
|
Provider string
|
|
Models []*registry.ModelInfo
|
|
Auth *coreauth.Auth
|
|
Handled bool
|
|
Err error
|
|
}
|
|
|
|
func pluginModelInfoToRegistryModelInfo(model pluginapi.ModelInfo) *registry.ModelInfo {
|
|
return ®istry.ModelInfo{
|
|
ID: model.ID,
|
|
Object: model.Object,
|
|
Created: model.Created,
|
|
OwnedBy: model.OwnedBy,
|
|
Type: model.Type,
|
|
DisplayName: model.DisplayName,
|
|
Name: model.Name,
|
|
Version: model.Version,
|
|
Description: model.Description,
|
|
InputTokenLimit: int(model.InputTokenLimit),
|
|
OutputTokenLimit: int(model.OutputTokenLimit),
|
|
SupportedGenerationMethods: cloneStringSlice(model.SupportedGenerationMethods),
|
|
ContextLength: int(model.ContextLength),
|
|
MaxCompletionTokens: int(model.MaxCompletionTokens),
|
|
SupportedParameters: cloneStringSlice(model.SupportedParameters),
|
|
SupportedInputModalities: cloneStringSlice(model.SupportedInputModalities),
|
|
SupportedOutputModalities: cloneStringSlice(model.SupportedOutputModalities),
|
|
Thinking: pluginThinkingSupportToRegistryThinkingSupport(model.Thinking),
|
|
UserDefined: model.UserDefined,
|
|
}
|
|
}
|
|
|
|
func pluginThinkingSupportToRegistryThinkingSupport(thinking *pluginapi.ThinkingSupport) *registry.ThinkingSupport {
|
|
if thinking == nil {
|
|
return nil
|
|
}
|
|
return ®istry.ThinkingSupport{
|
|
Min: thinking.Min,
|
|
Max: thinking.Max,
|
|
ZeroAllowed: thinking.ZeroAllowed,
|
|
DynamicAllowed: thinking.DynamicAllowed,
|
|
Levels: cloneStringSlice(thinking.Levels),
|
|
}
|
|
}
|
|
|
|
func registryModelInfoToPluginModelInfo(model *registry.ModelInfo) pluginapi.ModelInfo {
|
|
if model == nil {
|
|
return pluginapi.ModelInfo{}
|
|
}
|
|
return pluginapi.ModelInfo{
|
|
ID: model.ID,
|
|
Object: model.Object,
|
|
Created: model.Created,
|
|
OwnedBy: model.OwnedBy,
|
|
Type: model.Type,
|
|
DisplayName: model.DisplayName,
|
|
Name: model.Name,
|
|
Version: model.Version,
|
|
Description: model.Description,
|
|
InputTokenLimit: int64(model.InputTokenLimit),
|
|
OutputTokenLimit: int64(model.OutputTokenLimit),
|
|
SupportedGenerationMethods: cloneStringSlice(model.SupportedGenerationMethods),
|
|
ContextLength: int64(model.ContextLength),
|
|
MaxCompletionTokens: int64(model.MaxCompletionTokens),
|
|
SupportedParameters: cloneStringSlice(model.SupportedParameters),
|
|
SupportedInputModalities: cloneStringSlice(model.SupportedInputModalities),
|
|
SupportedOutputModalities: cloneStringSlice(model.SupportedOutputModalities),
|
|
Thinking: registryThinkingSupportToPluginThinkingSupport(model.Thinking),
|
|
UserDefined: model.UserDefined,
|
|
}
|
|
}
|
|
|
|
func registryThinkingSupportToPluginThinkingSupport(thinking *registry.ThinkingSupport) *pluginapi.ThinkingSupport {
|
|
if thinking == nil {
|
|
return nil
|
|
}
|
|
return &pluginapi.ThinkingSupport{
|
|
Min: thinking.Min,
|
|
Max: thinking.Max,
|
|
ZeroAllowed: thinking.ZeroAllowed,
|
|
DynamicAllowed: thinking.DynamicAllowed,
|
|
Levels: cloneStringSlice(thinking.Levels),
|
|
}
|
|
}
|
|
|
|
func cloneStringSlice(in []string) []string {
|
|
if len(in) == 0 {
|
|
return nil
|
|
}
|
|
return append([]string(nil), in...)
|
|
}
|
|
|
|
func cloneRegistryModels(in []*registry.ModelInfo) []*registry.ModelInfo {
|
|
if len(in) == 0 {
|
|
return nil
|
|
}
|
|
out := make([]*registry.ModelInfo, 0, len(in))
|
|
for _, model := range in {
|
|
if model == nil {
|
|
continue
|
|
}
|
|
copyModel := *model
|
|
copyModel.SupportedGenerationMethods = cloneStringSlice(model.SupportedGenerationMethods)
|
|
copyModel.SupportedParameters = cloneStringSlice(model.SupportedParameters)
|
|
copyModel.SupportedInputModalities = cloneStringSlice(model.SupportedInputModalities)
|
|
copyModel.SupportedOutputModalities = cloneStringSlice(model.SupportedOutputModalities)
|
|
if model.Thinking != nil {
|
|
thinking := *model.Thinking
|
|
thinking.Levels = cloneStringSlice(model.Thinking.Levels)
|
|
copyModel.Thinking = &thinking
|
|
}
|
|
out = append(out, ©Model)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (h *Host) RegisterModels(ctx context.Context, modelRegistry modelRegistry) {
|
|
if h == nil || modelRegistry == nil {
|
|
return
|
|
}
|
|
|
|
snap := h.Snapshot()
|
|
registrations := make([]modelClientRegistration, 0)
|
|
nextClients := make(map[string]struct{})
|
|
nextProviders := make(map[string]string)
|
|
nextModelRegistrations := make(map[string]pluginModelRegistration)
|
|
for _, record := range snap.records {
|
|
modelProvider := record.plugin.Capabilities.ModelProvider
|
|
registrar := record.plugin.Capabilities.ModelRegistrar
|
|
if modelProvider == nil && registrar == nil {
|
|
continue
|
|
}
|
|
if !executorScopeAllowsStaticModels(record.plugin.Capabilities) {
|
|
continue
|
|
}
|
|
var resp pluginapi.ModelRegistrationResponse
|
|
var errRegisterModels error
|
|
if modelProvider != nil {
|
|
modelResp, errStaticModels := h.callModelProviderStaticModels(ctx, record, modelProvider)
|
|
errRegisterModels = errStaticModels
|
|
resp = pluginapi.ModelRegistrationResponse{
|
|
Provider: modelResp.Provider,
|
|
Models: modelResp.Models,
|
|
}
|
|
} else {
|
|
resp, errRegisterModels = h.callModelRegistrar(ctx, record, registrar)
|
|
}
|
|
if errRegisterModels != nil {
|
|
log.Warnf("pluginhost: model registrar %s failed: %v", record.id, errRegisterModels)
|
|
continue
|
|
}
|
|
|
|
provider := strings.ToLower(strings.TrimSpace(resp.Provider))
|
|
if provider == "" || len(resp.Models) == 0 {
|
|
continue
|
|
}
|
|
|
|
models := make([]*registry.ModelInfo, 0, len(resp.Models))
|
|
for _, item := range resp.Models {
|
|
model := pluginModelInfoToRegistryModelInfo(item)
|
|
if model == nil || strings.TrimSpace(model.ID) == "" {
|
|
continue
|
|
}
|
|
model.ID = strings.TrimSpace(model.ID)
|
|
models = append(models, model)
|
|
}
|
|
if len(models) == 0 {
|
|
continue
|
|
}
|
|
|
|
nextModelRegistrations[record.id] = pluginModelRegistration{
|
|
pluginID: record.id,
|
|
provider: provider,
|
|
priority: record.priority,
|
|
models: cloneRegistryModels(models),
|
|
hasExecutor: record.plugin.Capabilities.Executor != nil,
|
|
}
|
|
nextProviders[record.id] = provider
|
|
if record.plugin.Capabilities.Executor == nil {
|
|
clientID := "plugin:" + record.id + ":" + provider
|
|
registrations = append(registrations, modelClientRegistration{
|
|
clientID: clientID,
|
|
provider: provider,
|
|
models: models,
|
|
})
|
|
nextClients[clientID] = struct{}{}
|
|
}
|
|
}
|
|
h.commitModelClients(snap, modelRegistry, registrations, nextClients, nextProviders, nextModelRegistrations)
|
|
}
|
|
|
|
func (h *Host) ModelsForAuth(ctx context.Context, auth *coreauth.Auth) AuthModelResult {
|
|
if h == nil || auth == nil {
|
|
return AuthModelResult{}
|
|
}
|
|
providerKey := normalizeProviderID(auth.Provider)
|
|
if providerKey == "" {
|
|
return AuthModelResult{}
|
|
}
|
|
for _, record := range h.Snapshot().records {
|
|
modelProvider := record.plugin.Capabilities.ModelProvider
|
|
if modelProvider == nil || h.isPluginFused(record.id) {
|
|
continue
|
|
}
|
|
if !executorScopeAllowsOAuthModels(record.plugin.Capabilities) {
|
|
continue
|
|
}
|
|
authProvider := record.plugin.Capabilities.AuthProvider
|
|
if authProvider != nil {
|
|
identifier, okIdentifier := h.callAuthProviderIdentifier(record.id, authProvider)
|
|
if !okIdentifier || normalizeProviderID(identifier) != providerKey {
|
|
continue
|
|
}
|
|
} else {
|
|
recordProvider := normalizeProviderID(h.modelProvider(record.id))
|
|
if recordProvider == "" {
|
|
executor := record.plugin.Capabilities.Executor
|
|
if executor != nil {
|
|
candidate, okCandidate := h.executorProvider(record, executor)
|
|
if okCandidate {
|
|
recordProvider = candidate
|
|
}
|
|
}
|
|
}
|
|
if recordProvider != providerKey {
|
|
continue
|
|
}
|
|
}
|
|
resp, errModels := h.callModelsForAuth(ctx, record, modelProvider, auth)
|
|
if errModels != nil {
|
|
log.Warnf("pluginhost: models for auth %s failed: %v", auth.ID, errModels)
|
|
return AuthModelResult{Handled: true, Err: errModels}
|
|
}
|
|
respProvider := normalizeProviderID(resp.Provider)
|
|
if respProvider != "" && respProvider != providerKey {
|
|
continue
|
|
}
|
|
if respProvider == "" {
|
|
respProvider = providerKey
|
|
}
|
|
models := make([]*registry.ModelInfo, 0, len(resp.Models))
|
|
for _, item := range resp.Models {
|
|
model := pluginModelInfoToRegistryModelInfo(item)
|
|
if model != nil {
|
|
model.ID = strings.TrimSpace(model.ID)
|
|
}
|
|
if model != nil && model.ID != "" {
|
|
models = append(models, model)
|
|
}
|
|
}
|
|
path := ""
|
|
if auth.Attributes != nil {
|
|
path = auth.Attributes["path"]
|
|
}
|
|
var updated *coreauth.Auth
|
|
if authDataHasValue(resp.AuthUpdate) {
|
|
updated = h.AuthDataToCoreAuth(authDataWithDefaults(resp.AuthUpdate, auth), path, auth.FileName)
|
|
}
|
|
return AuthModelResult{Provider: respProvider, Models: models, Auth: updated, Handled: true}
|
|
}
|
|
return AuthModelResult{}
|
|
}
|
|
|
|
func authDataHasValue(data pluginapi.AuthData) bool {
|
|
return strings.TrimSpace(data.Provider) != "" ||
|
|
strings.TrimSpace(data.ID) != "" ||
|
|
strings.TrimSpace(data.FileName) != "" ||
|
|
strings.TrimSpace(data.Label) != "" ||
|
|
strings.TrimSpace(data.Prefix) != "" ||
|
|
strings.TrimSpace(data.ProxyURL) != "" ||
|
|
data.Disabled ||
|
|
len(data.StorageJSON) > 0 ||
|
|
len(data.Metadata) > 0 ||
|
|
len(data.Attributes) > 0 ||
|
|
!data.NextRefreshAfter.IsZero()
|
|
}
|
|
|
|
func authDataWithDefaults(data pluginapi.AuthData, auth *coreauth.Auth) pluginapi.AuthData {
|
|
if auth == nil {
|
|
return data
|
|
}
|
|
if strings.TrimSpace(data.Provider) == "" {
|
|
data.Provider = auth.Provider
|
|
}
|
|
if strings.TrimSpace(data.ID) == "" {
|
|
data.ID = auth.ID
|
|
}
|
|
if strings.TrimSpace(data.FileName) == "" {
|
|
data.FileName = auth.FileName
|
|
}
|
|
if strings.TrimSpace(data.Label) == "" {
|
|
data.Label = auth.Label
|
|
}
|
|
if strings.TrimSpace(data.Prefix) == "" {
|
|
data.Prefix = auth.Prefix
|
|
}
|
|
if strings.TrimSpace(data.ProxyURL) == "" {
|
|
data.ProxyURL = auth.ProxyURL
|
|
}
|
|
if len(data.Metadata) == 0 {
|
|
data.Metadata = cloneAnyMap(auth.Metadata)
|
|
} else {
|
|
metadata := cloneAnyMap(data.Metadata)
|
|
for key, value := range auth.Metadata {
|
|
if _, exists := metadata[key]; !exists {
|
|
metadata[key] = value
|
|
}
|
|
}
|
|
data.Metadata = metadata
|
|
}
|
|
if len(data.Attributes) == 0 {
|
|
data.Attributes = cloneStringMap(auth.Attributes)
|
|
} else {
|
|
attributes := cloneStringMap(data.Attributes)
|
|
for key, value := range auth.Attributes {
|
|
if _, exists := attributes[key]; !exists {
|
|
attributes[key] = value
|
|
}
|
|
}
|
|
data.Attributes = attributes
|
|
}
|
|
if len(data.StorageJSON) == 0 {
|
|
data.StorageJSON = storageJSONFromAuth(auth)
|
|
}
|
|
if data.NextRefreshAfter.IsZero() {
|
|
data.NextRefreshAfter = auth.NextRefreshAfter
|
|
}
|
|
return data
|
|
}
|
|
|
|
type modelClientRegistration struct {
|
|
clientID string
|
|
provider string
|
|
models []*registry.ModelInfo
|
|
}
|
|
|
|
func (h *Host) callModelRegistrar(ctx context.Context, record capabilityRecord, registrar pluginapi.ModelRegistrar) (resp pluginapi.ModelRegistrationResponse, err error) {
|
|
if h == nil || registrar == nil || h.isPluginFused(record.id) {
|
|
return pluginapi.ModelRegistrationResponse{}, nil
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
h.fusePlugin(record.id, "ModelRegistrar.RegisterModels", recovered)
|
|
resp = pluginapi.ModelRegistrationResponse{}
|
|
err = fmt.Errorf("model registrar panic: %v", recovered)
|
|
}
|
|
}()
|
|
return registrar.RegisterModels(ctx, pluginapi.ModelRegistrationRequest{Plugin: record.meta})
|
|
}
|
|
|
|
func (h *Host) callModelProviderStaticModels(ctx context.Context, record capabilityRecord, provider pluginapi.ModelProvider) (resp pluginapi.ModelResponse, err error) {
|
|
if h == nil || provider == nil || h.isPluginFused(record.id) {
|
|
return pluginapi.ModelResponse{}, nil
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
h.fusePlugin(record.id, "ModelProvider.StaticModels", recovered)
|
|
resp = pluginapi.ModelResponse{}
|
|
err = fmt.Errorf("model provider panic: %v", recovered)
|
|
}
|
|
}()
|
|
return provider.StaticModels(ctx, pluginapi.StaticModelRequest{
|
|
Plugin: record.meta,
|
|
Host: h.hostConfigSummary(),
|
|
})
|
|
}
|
|
|
|
func (h *Host) callModelsForAuth(ctx context.Context, record capabilityRecord, provider pluginapi.ModelProvider, auth *coreauth.Auth) (resp pluginapi.ModelResponse, err error) {
|
|
if h == nil || provider == nil || auth == nil || h.isPluginFused(record.id) {
|
|
return pluginapi.ModelResponse{}, nil
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
h.fusePlugin(record.id, "ModelProvider.ModelsForAuth", recovered)
|
|
resp = pluginapi.ModelResponse{}
|
|
err = fmt.Errorf("model provider per-auth models panic: %v", recovered)
|
|
}
|
|
}()
|
|
return provider.ModelsForAuth(ctx, pluginapi.AuthModelRequest{
|
|
Plugin: record.meta,
|
|
AuthID: auth.ID,
|
|
AuthProvider: auth.Provider,
|
|
StorageJSON: storageJSONFromAuth(auth),
|
|
Metadata: cloneAnyMap(auth.Metadata),
|
|
Attributes: cloneStringMap(auth.Attributes),
|
|
Host: h.hostConfigSummary(),
|
|
HTTPClient: h.newHTTPClient(auth),
|
|
})
|
|
}
|
|
|
|
func (h *Host) callRequestInterceptor(ctx context.Context, pluginID, method string, call func(context.Context, pluginapi.RequestInterceptRequest) (pluginapi.RequestInterceptResponse, error), req pluginapi.RequestInterceptRequest) (out pluginapi.RequestInterceptResponse, ok bool) {
|
|
if h == nil || call == nil || h.isPluginFused(pluginID) {
|
|
return pluginapi.RequestInterceptResponse{}, false
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
h.fusePlugin(pluginID, method, recovered)
|
|
out = pluginapi.RequestInterceptResponse{}
|
|
ok = false
|
|
}
|
|
}()
|
|
resp, errIntercept := call(ctx, req)
|
|
if errIntercept != nil {
|
|
log.Warnf("pluginhost: request interceptor %s failed: %v", pluginID, errIntercept)
|
|
return pluginapi.RequestInterceptResponse{}, false
|
|
}
|
|
return resp, true
|
|
}
|
|
|
|
func (h *Host) callResponseInterceptor(ctx context.Context, pluginID string, interceptor pluginapi.ResponseInterceptor, req pluginapi.ResponseInterceptRequest) (out pluginapi.ResponseInterceptResponse, ok bool) {
|
|
if h == nil || interceptor == nil || h.isPluginFused(pluginID) {
|
|
return pluginapi.ResponseInterceptResponse{}, false
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
h.fusePlugin(pluginID, "ResponseInterceptor.InterceptResponse", recovered)
|
|
out = pluginapi.ResponseInterceptResponse{}
|
|
ok = false
|
|
}
|
|
}()
|
|
resp, errIntercept := interceptor.InterceptResponse(ctx, req)
|
|
if errIntercept != nil {
|
|
log.Warnf("pluginhost: response interceptor %s failed: %v", pluginID, errIntercept)
|
|
return pluginapi.ResponseInterceptResponse{}, false
|
|
}
|
|
return resp, true
|
|
}
|
|
|
|
func (h *Host) callStreamChunkInterceptor(ctx context.Context, pluginID string, interceptor pluginapi.StreamChunkInterceptor, req pluginapi.StreamChunkInterceptRequest) (out pluginapi.StreamChunkInterceptResponse, ok bool) {
|
|
if h == nil || interceptor == nil || h.isPluginFused(pluginID) {
|
|
return pluginapi.StreamChunkInterceptResponse{}, false
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
h.fusePlugin(pluginID, "StreamChunkInterceptor.InterceptStreamChunk", recovered)
|
|
out = pluginapi.StreamChunkInterceptResponse{}
|
|
ok = false
|
|
}
|
|
}()
|
|
resp, errIntercept := interceptor.InterceptStreamChunk(ctx, req)
|
|
if errIntercept != nil {
|
|
log.Warnf("pluginhost: stream chunk interceptor %s failed: %v", pluginID, errIntercept)
|
|
return pluginapi.StreamChunkInterceptResponse{}, false
|
|
}
|
|
return resp, true
|
|
}
|
|
|
|
func (h *Host) InterceptRequestBeforeAuth(ctx context.Context, req pluginapi.RequestInterceptRequest) pluginapi.RequestInterceptResponse {
|
|
return h.interceptRequest(ctx, req, "RequestInterceptor.InterceptRequestBeforeAuth", func(interceptor pluginapi.RequestInterceptor, ctx context.Context, req pluginapi.RequestInterceptRequest) (pluginapi.RequestInterceptResponse, error) {
|
|
return interceptor.InterceptRequestBeforeAuth(ctx, req)
|
|
})
|
|
}
|
|
|
|
func (h *Host) InterceptRequestAfterAuth(ctx context.Context, req pluginapi.RequestInterceptRequest) pluginapi.RequestInterceptResponse {
|
|
return h.interceptRequest(ctx, req, "RequestInterceptor.InterceptRequestAfterAuth", func(interceptor pluginapi.RequestInterceptor, ctx context.Context, req pluginapi.RequestInterceptRequest) (pluginapi.RequestInterceptResponse, error) {
|
|
return interceptor.InterceptRequestAfterAuth(ctx, req)
|
|
})
|
|
}
|
|
|
|
func (h *Host) interceptRequest(ctx context.Context, req pluginapi.RequestInterceptRequest, method string, invoke func(pluginapi.RequestInterceptor, context.Context, pluginapi.RequestInterceptRequest) (pluginapi.RequestInterceptResponse, error)) pluginapi.RequestInterceptResponse {
|
|
current := pluginapi.RequestInterceptResponse{
|
|
Headers: cloneHeader(req.Headers),
|
|
Body: bytes.Clone(req.Body),
|
|
}
|
|
for _, record := range h.Snapshot().records {
|
|
interceptor := record.plugin.Capabilities.RequestInterceptor
|
|
if h.isPluginFused(record.id) || interceptor == nil {
|
|
continue
|
|
}
|
|
nextReq := req
|
|
nextReq.Headers = cloneHeader(current.Headers)
|
|
nextReq.Body = bytes.Clone(current.Body)
|
|
nextReq.Metadata = cloneInterceptorMetadata(req.Metadata)
|
|
if resp, ok := h.callRequestInterceptor(ctx, record.id, method, func(callCtx context.Context, callReq pluginapi.RequestInterceptRequest) (pluginapi.RequestInterceptResponse, error) {
|
|
return invoke(interceptor, callCtx, callReq)
|
|
}, nextReq); ok {
|
|
current.Headers = mergeHeaders(current.Headers, resp.Headers, resp.ClearHeaders)
|
|
if len(resp.Body) > 0 {
|
|
current.Body = bytes.Clone(resp.Body)
|
|
}
|
|
}
|
|
}
|
|
return current
|
|
}
|
|
|
|
func (h *Host) InterceptResponse(ctx context.Context, req pluginapi.ResponseInterceptRequest) pluginapi.ResponseInterceptResponse {
|
|
current := pluginapi.ResponseInterceptResponse{
|
|
Headers: cloneHeader(req.ResponseHeaders),
|
|
Body: bytes.Clone(req.Body),
|
|
}
|
|
for _, record := range h.Snapshot().records {
|
|
interceptor := record.plugin.Capabilities.ResponseInterceptor
|
|
if h.isPluginFused(record.id) || interceptor == nil {
|
|
continue
|
|
}
|
|
nextReq := req
|
|
nextReq.RequestHeaders = cloneHeader(req.RequestHeaders)
|
|
nextReq.ResponseHeaders = cloneHeader(current.Headers)
|
|
nextReq.OriginalRequest = bytes.Clone(req.OriginalRequest)
|
|
nextReq.RequestBody = bytes.Clone(req.RequestBody)
|
|
nextReq.Body = bytes.Clone(current.Body)
|
|
nextReq.Metadata = cloneInterceptorMetadata(req.Metadata)
|
|
if resp, ok := h.callResponseInterceptor(ctx, record.id, interceptor, nextReq); ok {
|
|
current.Headers = mergeHeaders(current.Headers, resp.Headers, resp.ClearHeaders)
|
|
if len(resp.Body) > 0 {
|
|
current.Body = bytes.Clone(resp.Body)
|
|
}
|
|
}
|
|
}
|
|
return current
|
|
}
|
|
|
|
func (h *Host) InterceptStreamChunk(ctx context.Context, req pluginapi.StreamChunkInterceptRequest) pluginapi.StreamChunkInterceptResponse {
|
|
current := pluginapi.StreamChunkInterceptResponse{
|
|
Headers: cloneHeader(req.ResponseHeaders),
|
|
Body: bytes.Clone(req.Body),
|
|
}
|
|
for _, record := range h.Snapshot().records {
|
|
interceptor := record.plugin.Capabilities.StreamChunkInterceptor
|
|
if h.isPluginFused(record.id) || interceptor == nil || current.DropChunk {
|
|
continue
|
|
}
|
|
nextReq := req
|
|
nextReq.RequestHeaders = cloneHeader(req.RequestHeaders)
|
|
nextReq.ResponseHeaders = cloneHeader(current.Headers)
|
|
nextReq.OriginalRequest = bytes.Clone(req.OriginalRequest)
|
|
nextReq.RequestBody = bytes.Clone(req.RequestBody)
|
|
nextReq.Body = bytes.Clone(current.Body)
|
|
nextReq.HistoryChunks = cloneByteSlices(req.HistoryChunks)
|
|
nextReq.Metadata = cloneInterceptorMetadata(req.Metadata)
|
|
if resp, ok := h.callStreamChunkInterceptor(ctx, record.id, interceptor, nextReq); ok {
|
|
current.Headers = mergeHeaders(current.Headers, resp.Headers, resp.ClearHeaders)
|
|
if len(resp.Body) > 0 {
|
|
current.Body = bytes.Clone(resp.Body)
|
|
}
|
|
if resp.DropChunk {
|
|
current.DropChunk = true
|
|
}
|
|
}
|
|
}
|
|
return current
|
|
}
|
|
|
|
func (h *Host) HasStreamInterceptors() bool {
|
|
if h == nil {
|
|
return false
|
|
}
|
|
for _, record := range h.Snapshot().records {
|
|
if h.isPluginFused(record.id) {
|
|
continue
|
|
}
|
|
if record.plugin.Capabilities.StreamChunkInterceptor != nil {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (h *Host) HasRequestInterceptors() bool {
|
|
if h == nil {
|
|
return false
|
|
}
|
|
for _, record := range h.Snapshot().records {
|
|
if h.isPluginFused(record.id) {
|
|
continue
|
|
}
|
|
if record.plugin.Capabilities.RequestInterceptor != nil {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (h *Host) commitModelClients(snap *Snapshot, modelRegistry modelRegistry, registrations []modelClientRegistration, nextClients map[string]struct{}, nextProviders map[string]string, nextModelRegistrations map[string]pluginModelRegistration) {
|
|
if h == nil || modelRegistry == nil {
|
|
return
|
|
}
|
|
|
|
staleClients := make([]string, 0)
|
|
h.mu.Lock()
|
|
if h.Snapshot() != snap {
|
|
h.mu.Unlock()
|
|
return
|
|
}
|
|
for clientID := range h.modelClientIDs {
|
|
if _, okClient := nextClients[clientID]; !okClient {
|
|
staleClients = append(staleClients, clientID)
|
|
}
|
|
}
|
|
h.modelClientIDs = nextClients
|
|
h.modelProviders = nextProviders
|
|
h.modelRegistrations = nextModelRegistrations
|
|
h.mu.Unlock()
|
|
|
|
for _, registration := range registrations {
|
|
modelRegistry.RegisterClient(registration.clientID, registration.provider, registration.models)
|
|
}
|
|
for _, clientID := range staleClients {
|
|
modelRegistry.UnregisterClient(clientID)
|
|
}
|
|
}
|
|
|
|
type executorManager interface {
|
|
Executor(provider string) (coreauth.ProviderExecutor, bool)
|
|
RegisterExecutor(coreauth.ProviderExecutor)
|
|
UnregisterExecutor(provider string)
|
|
}
|
|
|
|
type executorRegistration struct {
|
|
provider string
|
|
adapter *executorAdapter
|
|
}
|
|
|
|
func (h *Host) RegisterExecutors(manager executorManager, modelRegistry modelProviderRegistry) {
|
|
if h == nil || manager == nil {
|
|
return
|
|
}
|
|
|
|
snap := h.Snapshot()
|
|
registrations := h.snapshotModelRegistrations()
|
|
selectedModels := make(map[string][]*registry.ModelInfo)
|
|
providerModels := make(map[string][]*registry.ModelInfo)
|
|
claimedModels := make(map[string]struct{})
|
|
claimedProviders := make(map[string]string)
|
|
for _, registration := range registrations {
|
|
if !registration.hasExecutor {
|
|
appendModelsForProvider(providerModels, registration.provider, registration.models)
|
|
}
|
|
}
|
|
for _, record := range snap.records {
|
|
executor := record.plugin.Capabilities.Executor
|
|
if executor == nil || h.isPluginFused(record.id) {
|
|
continue
|
|
}
|
|
provider, okProvider := h.executorProvider(record, executor)
|
|
if !okProvider {
|
|
continue
|
|
}
|
|
registration := h.modelRegistration(record.id)
|
|
if h.providerHasNativeExecutor(manager, provider) {
|
|
appendModelsForProvider(providerModels, provider, registration.models)
|
|
continue
|
|
}
|
|
if len(registration.models) == 0 {
|
|
continue
|
|
}
|
|
if owner := claimedProviders[provider]; owner != "" && owner != record.id {
|
|
continue
|
|
}
|
|
for _, model := range registration.models {
|
|
modelID := strings.TrimSpace(model.ID)
|
|
if modelID == "" {
|
|
continue
|
|
}
|
|
if _, claimed := claimedModels[modelID]; claimed {
|
|
continue
|
|
}
|
|
if h.modelHasNativeExecutor(manager, modelRegistry, modelID) {
|
|
continue
|
|
}
|
|
claimedModels[modelID] = struct{}{}
|
|
claimedProviders[provider] = record.id
|
|
selectedModels[record.id] = append(selectedModels[record.id], model)
|
|
}
|
|
}
|
|
|
|
seenProviders := make(map[string]struct{})
|
|
nextProviders := make(map[string]struct{})
|
|
nextModelClients := make(map[string]struct{})
|
|
executorRegistrations := make([]executorRegistration, 0)
|
|
modelClientRegistrations := make([]modelClientRegistration, 0)
|
|
for _, record := range snap.records {
|
|
executor := record.plugin.Capabilities.Executor
|
|
if executor == nil || h.isPluginFused(record.id) {
|
|
continue
|
|
}
|
|
|
|
provider, okProvider := h.executorProvider(record, executor)
|
|
if !okProvider {
|
|
continue
|
|
}
|
|
registration := h.modelRegistration(record.id)
|
|
if len(registration.models) > 0 && len(selectedModels[record.id]) == 0 {
|
|
continue
|
|
}
|
|
if _, seenProvider := seenProviders[provider]; seenProvider {
|
|
continue
|
|
}
|
|
seenProviders[provider] = struct{}{}
|
|
if h.providerHasNativeExecutor(manager, provider) {
|
|
continue
|
|
}
|
|
|
|
nextProviders[provider] = struct{}{}
|
|
executorRegistrations = append(executorRegistrations, newExecutorAdapterRegistration(h, record, provider, executor))
|
|
appendModelsForProvider(providerModels, provider, selectedModels[record.id])
|
|
if len(selectedModels[record.id]) > 0 {
|
|
clientID := pluginExecutorModelClientID(record.id, provider)
|
|
modelClientRegistrations = append(modelClientRegistrations, modelClientRegistration{
|
|
clientID: clientID,
|
|
provider: provider,
|
|
models: selectedModels[record.id],
|
|
})
|
|
nextModelClients[clientID] = struct{}{}
|
|
}
|
|
}
|
|
h.commitExecutorState(snap, manager, modelRegistry, providerModels, executorRegistrations, nextProviders, modelClientRegistrations, nextModelClients)
|
|
}
|
|
|
|
func pluginExecutorModelClientID(pluginID, provider string) string {
|
|
return "plugin:" + pluginID + ":" + provider + ":executor"
|
|
}
|
|
|
|
func (h *Host) commitExecutorState(snap *Snapshot, manager executorManager, modelRegistry modelRegistry, providerModels map[string][]*registry.ModelInfo, registrations []executorRegistration, nextProviders map[string]struct{}, modelClientRegistrations []modelClientRegistration, nextModelClients map[string]struct{}) {
|
|
if h == nil || manager == nil {
|
|
return
|
|
}
|
|
|
|
h.mu.Lock()
|
|
if h.Snapshot() != snap {
|
|
h.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
h.providerModels = make(map[string][]*registryModelInfo, len(providerModels))
|
|
for provider, models := range providerModels {
|
|
h.providerModels[provider] = cloneRegistryModels(models)
|
|
}
|
|
|
|
staleProviders := make([]string, 0)
|
|
for provider := range h.executorProviders {
|
|
if _, okProvider := nextProviders[provider]; !okProvider {
|
|
staleProviders = append(staleProviders, provider)
|
|
}
|
|
}
|
|
h.executorProviders = nextProviders
|
|
if nextModelClients == nil {
|
|
nextModelClients = make(map[string]struct{})
|
|
}
|
|
staleModelClients := make([]string, 0)
|
|
for clientID := range h.executorModelClientIDs {
|
|
if _, okClient := nextModelClients[clientID]; !okClient {
|
|
staleModelClients = append(staleModelClients, clientID)
|
|
}
|
|
}
|
|
h.executorModelClientIDs = nextModelClients
|
|
|
|
for _, registration := range registrations {
|
|
if registration.adapter == nil || registration.provider == "" {
|
|
continue
|
|
}
|
|
manager.RegisterExecutor(registration.adapter)
|
|
}
|
|
for _, provider := range staleProviders {
|
|
existing, okExecutor := manager.Executor(provider)
|
|
if !okExecutor || !h.ownsExecutor(existing) {
|
|
continue
|
|
}
|
|
manager.UnregisterExecutor(provider)
|
|
}
|
|
h.mu.Unlock()
|
|
|
|
if modelRegistry == nil {
|
|
return
|
|
}
|
|
for _, registration := range modelClientRegistrations {
|
|
modelRegistry.RegisterClient(registration.clientID, registration.provider, registration.models)
|
|
}
|
|
for _, clientID := range staleModelClients {
|
|
modelRegistry.UnregisterClient(clientID)
|
|
}
|
|
}
|
|
|
|
func newExecutorAdapterRegistration(h *Host, record capabilityRecord, provider string, executor pluginapi.ProviderExecutor) executorRegistration {
|
|
return executorRegistration{
|
|
provider: provider,
|
|
adapter: &executorAdapter{
|
|
host: h,
|
|
pluginID: record.id,
|
|
provider: provider,
|
|
executor: executor,
|
|
inputFormats: normalizeExecutorFormats(record.plugin.Capabilities.ExecutorInputFormats),
|
|
outputFormats: normalizeExecutorFormats(record.plugin.Capabilities.ExecutorOutputFormats),
|
|
},
|
|
}
|
|
}
|
|
|
|
func (h *Host) snapshotModelRegistrations() []pluginModelRegistration {
|
|
if h == nil {
|
|
return nil
|
|
}
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
registrations := make([]pluginModelRegistration, 0, len(h.modelRegistrations))
|
|
for _, registration := range h.modelRegistrations {
|
|
registration.models = cloneRegistryModels(registration.models)
|
|
registrations = append(registrations, registration)
|
|
}
|
|
sort.SliceStable(registrations, func(i, j int) bool {
|
|
if registrations[i].priority == registrations[j].priority {
|
|
return registrations[i].pluginID < registrations[j].pluginID
|
|
}
|
|
return registrations[i].priority > registrations[j].priority
|
|
})
|
|
return registrations
|
|
}
|
|
|
|
func (h *Host) modelRegistration(pluginID string) pluginModelRegistration {
|
|
if h == nil {
|
|
return pluginModelRegistration{}
|
|
}
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
registration := h.modelRegistrations[pluginID]
|
|
registration.models = cloneRegistryModels(registration.models)
|
|
return registration
|
|
}
|
|
|
|
func (h *Host) executorProvider(record capabilityRecord, executor pluginapi.ProviderExecutor) (string, bool) {
|
|
provider := h.modelProvider(record.id)
|
|
if provider == "" {
|
|
identifier, okIdentifier := h.callExecutorIdentifier(record.id, executor)
|
|
if !okIdentifier {
|
|
return "", false
|
|
}
|
|
provider = identifier
|
|
}
|
|
provider = strings.ToLower(strings.TrimSpace(provider))
|
|
return provider, provider != ""
|
|
}
|
|
|
|
func (h *Host) callExecutorIdentifier(pluginID string, executor pluginapi.ProviderExecutor) (provider string, ok bool) {
|
|
if h == nil || executor == nil || h.isPluginFused(pluginID) {
|
|
return "", false
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
h.fusePlugin(pluginID, "Executor.Identifier", recovered)
|
|
provider = ""
|
|
ok = false
|
|
}
|
|
}()
|
|
return executor.Identifier(), true
|
|
}
|
|
|
|
func (h *Host) providerHasNativeExecutor(manager executorManager, provider string) bool {
|
|
if h == nil || manager == nil {
|
|
return false
|
|
}
|
|
existing, okExecutor := manager.Executor(provider)
|
|
return okExecutor && existing != nil && !h.ownsExecutor(existing)
|
|
}
|
|
|
|
func (h *Host) modelHasNativeExecutor(manager executorManager, modelRegistry modelProviderRegistry, modelID string) bool {
|
|
if h == nil || manager == nil || modelRegistry == nil {
|
|
return false
|
|
}
|
|
for _, provider := range modelRegistry.GetModelProviders(modelID) {
|
|
if h.providerHasNativeExecutor(manager, provider) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func appendModelsForProvider(out map[string][]*registry.ModelInfo, provider string, models []*registry.ModelInfo) {
|
|
provider = strings.ToLower(strings.TrimSpace(provider))
|
|
if provider == "" || len(models) == 0 {
|
|
return
|
|
}
|
|
seen := make(map[string]struct{}, len(out[provider])+len(models))
|
|
for _, model := range out[provider] {
|
|
if model != nil && strings.TrimSpace(model.ID) != "" {
|
|
seen[strings.TrimSpace(model.ID)] = struct{}{}
|
|
}
|
|
}
|
|
for _, model := range models {
|
|
if model == nil {
|
|
continue
|
|
}
|
|
modelID := strings.TrimSpace(model.ID)
|
|
if modelID == "" {
|
|
continue
|
|
}
|
|
if _, exists := seen[modelID]; exists {
|
|
continue
|
|
}
|
|
seen[modelID] = struct{}{}
|
|
out[provider] = append(out[provider], cloneRegistryModels([]*registry.ModelInfo{model})...)
|
|
}
|
|
}
|
|
|
|
func (h *Host) ModelsForProvider(provider string) []*registry.ModelInfo {
|
|
if h == nil {
|
|
return nil
|
|
}
|
|
provider = strings.ToLower(strings.TrimSpace(provider))
|
|
if provider == "" {
|
|
return nil
|
|
}
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
return cloneRegistryModels(h.providerModels[provider])
|
|
}
|
|
|
|
func (h *Host) HasExecutorCandidateProvider(provider string) bool {
|
|
if h == nil {
|
|
return false
|
|
}
|
|
provider = strings.ToLower(strings.TrimSpace(provider))
|
|
if provider == "" {
|
|
return false
|
|
}
|
|
for _, record := range h.Snapshot().records {
|
|
executor := record.plugin.Capabilities.Executor
|
|
if executor == nil || h.isPluginFused(record.id) {
|
|
continue
|
|
}
|
|
candidate, okCandidate := h.executorProvider(record, executor)
|
|
if okCandidate && candidate == provider {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (h *Host) ownsExecutor(executor coreauth.ProviderExecutor) bool {
|
|
adapter, okAdapter := executor.(*executorAdapter)
|
|
return okAdapter && adapter != nil && adapter.host == h
|
|
}
|
|
|
|
func (h *Host) modelProvider(pluginID string) string {
|
|
if h == nil {
|
|
return ""
|
|
}
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
return h.modelProviders[pluginID]
|
|
}
|
|
|
|
func (h *Host) RegisterFrontendAuthProviders() {
|
|
if h == nil {
|
|
return
|
|
}
|
|
|
|
type exclusiveFrontendAuthCandidate struct {
|
|
key string
|
|
pluginID string
|
|
priority int
|
|
}
|
|
|
|
nextKeys := make(map[string]struct{})
|
|
var bestExclusive exclusiveFrontendAuthCandidate
|
|
for _, record := range h.Snapshot().records {
|
|
provider := record.plugin.Capabilities.FrontendAuthProvider
|
|
if provider == nil || h.isPluginFused(record.id) {
|
|
continue
|
|
}
|
|
adapter := &accessAdapter{
|
|
host: h,
|
|
pluginID: record.id,
|
|
provider: provider,
|
|
}
|
|
key := strings.TrimSpace(adapter.Identifier())
|
|
if key == "" {
|
|
continue
|
|
}
|
|
sdkaccess.RegisterProvider(key, adapter)
|
|
nextKeys[key] = struct{}{}
|
|
if record.plugin.Capabilities.FrontendAuthProviderExclusive {
|
|
candidate := exclusiveFrontendAuthCandidate{
|
|
key: key,
|
|
pluginID: record.id,
|
|
priority: record.priority,
|
|
}
|
|
if bestExclusive.key == "" ||
|
|
candidate.priority > bestExclusive.priority ||
|
|
(candidate.priority == bestExclusive.priority && candidate.pluginID < bestExclusive.pluginID) {
|
|
bestExclusive = candidate
|
|
}
|
|
}
|
|
}
|
|
|
|
if bestExclusive.key != "" {
|
|
sdkaccess.SetExclusiveProvider(bestExclusive.key)
|
|
} else {
|
|
sdkaccess.ClearExclusiveProvider()
|
|
}
|
|
h.pruneStaleAccessProviders(nextKeys)
|
|
}
|
|
|
|
func (h *Host) pruneStaleAccessProviders(nextKeys map[string]struct{}) {
|
|
if h == nil {
|
|
return
|
|
}
|
|
|
|
staleKeys := make([]string, 0)
|
|
h.mu.Lock()
|
|
for key := range h.accessProviderKeys {
|
|
if _, okKey := nextKeys[key]; !okKey {
|
|
staleKeys = append(staleKeys, key)
|
|
}
|
|
}
|
|
h.accessProviderKeys = nextKeys
|
|
h.mu.Unlock()
|
|
|
|
for _, key := range staleKeys {
|
|
sdkaccess.UnregisterProvider(key)
|
|
}
|
|
}
|
|
|
|
func (h *Host) RegisterUsagePlugins() {
|
|
if h == nil {
|
|
return
|
|
}
|
|
|
|
for _, record := range h.Snapshot().records {
|
|
plugin := record.plugin.Capabilities.UsagePlugin
|
|
if plugin == nil || h.isPluginFused(record.id) {
|
|
continue
|
|
}
|
|
coreusage.RegisterNamedPlugin("plugin:"+record.id, &usageAdapter{
|
|
host: h,
|
|
pluginID: record.id,
|
|
plugin: plugin,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (h *Host) refreshThinkingProviders(records []capabilityRecord) {
|
|
thinking.ClearPluginProviders()
|
|
if h == nil {
|
|
return
|
|
}
|
|
for _, record := range records {
|
|
applier := record.plugin.Capabilities.ThinkingApplier
|
|
if applier == nil || h.isPluginFused(record.id) {
|
|
continue
|
|
}
|
|
provider, okProvider := h.callThinkingIdentifier(record, applier)
|
|
if !okProvider {
|
|
continue
|
|
}
|
|
thinking.RegisterPluginProvider(record.id, provider, record.priority, &thinkingAdapter{
|
|
host: h,
|
|
pluginID: record.id,
|
|
provider: provider,
|
|
applier: applier,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (h *Host) callThinkingIdentifier(record capabilityRecord, applier pluginapi.ThinkingApplier) (provider string, ok bool) {
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
h.fusePlugin(record.id, "ThinkingApplier.Identifier", recovered)
|
|
provider = ""
|
|
ok = false
|
|
}
|
|
}()
|
|
provider = strings.ToLower(strings.TrimSpace(applier.Identifier()))
|
|
if provider == "" {
|
|
return "", false
|
|
}
|
|
return provider, true
|
|
}
|
|
|
|
func (h *Host) currentUsagePlugin(pluginID string) pluginapi.UsagePlugin {
|
|
if h == nil || strings.TrimSpace(pluginID) == "" {
|
|
return nil
|
|
}
|
|
for _, record := range h.Snapshot().records {
|
|
if record.id != pluginID {
|
|
continue
|
|
}
|
|
if h.isPluginFused(record.id) {
|
|
return nil
|
|
}
|
|
return record.plugin.Capabilities.UsagePlugin
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *Host) fusePlugin(id, method string, recovered any) {
|
|
if h == nil {
|
|
return
|
|
}
|
|
h.mu.Lock()
|
|
h.fused[id] = fmt.Sprintf("%s panic: %v", method, recovered)
|
|
h.mu.Unlock()
|
|
thinking.UnregisterPluginProviders(id)
|
|
log.WithField("plugin_id", id).WithField("method", method).Errorf("pluginhost: plugin panic recovered: %v\n%s", recovered, debug.Stack())
|
|
}
|
|
|
|
func (h *Host) isPluginFused(id string) bool {
|
|
if h == nil {
|
|
return false
|
|
}
|
|
h.mu.Lock()
|
|
_, fused := h.fused[id]
|
|
h.mu.Unlock()
|
|
return fused
|
|
}
|
|
|
|
type accessAdapter struct {
|
|
host *Host
|
|
pluginID string
|
|
provider pluginapi.FrontendAuthProvider
|
|
}
|
|
|
|
func (a *accessAdapter) Identifier() (identifier string) {
|
|
if a == nil || a.provider == nil {
|
|
return ""
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
if a.host != nil {
|
|
a.host.fusePlugin(a.pluginID, "FrontendAuthProvider.Identifier", recovered)
|
|
}
|
|
identifier = ""
|
|
}
|
|
}()
|
|
pluginID := strings.TrimSpace(a.pluginID)
|
|
providerID := strings.TrimSpace(a.provider.Identifier())
|
|
if pluginID == "" || providerID == "" {
|
|
return ""
|
|
}
|
|
return "plugin:" + pluginID + ":" + providerID
|
|
}
|
|
|
|
func (a *accessAdapter) Authenticate(ctx context.Context, r *http.Request) (result *sdkaccess.Result, authErr *sdkaccess.AuthError) {
|
|
if a == nil || a.provider == nil || a.host.isPluginFused(a.pluginID) {
|
|
return nil, sdkaccess.NewNotHandledError()
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
a.host.fusePlugin(a.pluginID, "FrontendAuthProvider.Authenticate", recovered)
|
|
result = nil
|
|
authErr = sdkaccess.NewNotHandledError()
|
|
}
|
|
}()
|
|
|
|
body, errReadAll := readAndRestoreRequestBody(r)
|
|
if errReadAll != nil {
|
|
return nil, sdkaccess.NewInternalAuthError("failed to read plugin auth request body", errReadAll)
|
|
}
|
|
resp, errAuthenticate := a.provider.Authenticate(ctx, pluginapi.FrontendAuthRequest{
|
|
Method: r.Method,
|
|
Path: r.URL.Path,
|
|
Headers: cloneHeader(r.Header),
|
|
Query: cloneValues(r.URL.Query()),
|
|
Body: bytes.Clone(body),
|
|
})
|
|
if errAuthenticate != nil || !resp.Authenticated {
|
|
return nil, sdkaccess.NewNotHandledError()
|
|
}
|
|
providerID := a.Identifier()
|
|
if providerID == "" {
|
|
return nil, sdkaccess.NewNotHandledError()
|
|
}
|
|
return &sdkaccess.Result{
|
|
Provider: providerID,
|
|
Principal: resp.Principal,
|
|
Metadata: cloneStringMap(resp.Metadata),
|
|
}, nil
|
|
}
|
|
|
|
type executorAdapter struct {
|
|
host *Host
|
|
pluginID string
|
|
provider string
|
|
executor pluginapi.ProviderExecutor
|
|
inputFormats []sdktranslator.Format
|
|
outputFormats []sdktranslator.Format
|
|
}
|
|
|
|
func (a *executorAdapter) Identifier() string {
|
|
if a == nil {
|
|
return ""
|
|
}
|
|
return a.provider
|
|
}
|
|
|
|
type preparedExecutorCall struct {
|
|
req coreexecutor.Request
|
|
opts coreexecutor.Options
|
|
requestedFormat sdktranslator.Format
|
|
inputFormat sdktranslator.Format
|
|
outputFormat sdktranslator.Format
|
|
}
|
|
|
|
func (a *executorAdapter) prepareExecutorCall(req coreexecutor.Request, opts coreexecutor.Options) (preparedExecutorCall, error) {
|
|
requestedFormat := executorRequestedFormat(req, opts)
|
|
inputFormat, errInput := a.selectExecutorInputFormat(requestedFormat)
|
|
if errInput != nil {
|
|
return preparedExecutorCall{}, errInput
|
|
}
|
|
outputFormat, errOutput := a.selectExecutorOutputFormat(requestedFormat, inputFormat)
|
|
if errOutput != nil {
|
|
return preparedExecutorCall{}, errOutput
|
|
}
|
|
|
|
nativeReq := req
|
|
nativeOpts := opts
|
|
if requestedFormat != "" && requestedFormat != inputFormat {
|
|
nativeReq.Payload = sdktranslator.TranslateRequest(requestedFormat, inputFormat, req.Model, req.Payload, opts.Stream)
|
|
}
|
|
nativeReq.Format = outputFormat
|
|
nativeOpts.SourceFormat = inputFormat
|
|
|
|
return preparedExecutorCall{
|
|
req: nativeReq,
|
|
opts: nativeOpts,
|
|
requestedFormat: requestedFormat,
|
|
inputFormat: inputFormat,
|
|
outputFormat: outputFormat,
|
|
}, nil
|
|
}
|
|
|
|
func (a *executorAdapter) RequestToFormat(req coreexecutor.Request, opts coreexecutor.Options) sdktranslator.Format {
|
|
if a == nil {
|
|
return ""
|
|
}
|
|
requestedFormat := executorRequestedFormat(req, opts)
|
|
inputFormat, errInput := a.selectExecutorInputFormat(requestedFormat)
|
|
if errInput != nil {
|
|
return ""
|
|
}
|
|
return inputFormat
|
|
}
|
|
|
|
func executorRequestedFormat(req coreexecutor.Request, opts coreexecutor.Options) sdktranslator.Format {
|
|
if opts.SourceFormat != "" {
|
|
return normalizeExecutorFormatName(opts.SourceFormat.String())
|
|
}
|
|
if req.Format != "" {
|
|
return normalizeExecutorFormatName(req.Format.String())
|
|
}
|
|
return sdktranslator.FormatOpenAI
|
|
}
|
|
|
|
func (a *executorAdapter) selectExecutorInputFormat(requested sdktranslator.Format) (sdktranslator.Format, error) {
|
|
if len(a.inputFormats) == 0 {
|
|
return "", fmt.Errorf("plugin executor %s declares no input formats", a.Identifier())
|
|
}
|
|
if executorFormatContains(a.inputFormats, requested) {
|
|
return requested, nil
|
|
}
|
|
for _, format := range a.inputFormats {
|
|
if requested == "" || sdktranslator.HasRequestTransformer(requested, format) {
|
|
return format, nil
|
|
}
|
|
}
|
|
return "", fmt.Errorf("plugin executor %s does not support input format %q", a.Identifier(), requested)
|
|
}
|
|
|
|
func (a *executorAdapter) selectExecutorOutputFormat(requested, inputFormat sdktranslator.Format) (sdktranslator.Format, error) {
|
|
if len(a.outputFormats) == 0 {
|
|
return "", fmt.Errorf("plugin executor %s declares no output formats", a.Identifier())
|
|
}
|
|
if executorFormatContains(a.outputFormats, requested) {
|
|
return requested, nil
|
|
}
|
|
if executorFormatContains(a.outputFormats, inputFormat) && executorResponseTranslatorExists(inputFormat, requested) {
|
|
return inputFormat, nil
|
|
}
|
|
for _, format := range a.outputFormats {
|
|
if requested == "" || executorResponseTranslatorExists(format, requested) {
|
|
return format, nil
|
|
}
|
|
}
|
|
return "", fmt.Errorf("plugin executor %s does not support output format %q", a.Identifier(), requested)
|
|
}
|
|
|
|
func executorResponseTranslatorExists(from, to sdktranslator.Format) bool {
|
|
if from == "" || to == "" || from == to {
|
|
return true
|
|
}
|
|
return sdktranslator.HasStreamResponseTransformer(to, from)
|
|
}
|
|
|
|
func (a *executorAdapter) translateExecutorResponse(ctx context.Context, prepared preparedExecutorCall, payload []byte, stream bool, param *any) []byte {
|
|
if prepared.requestedFormat == "" || prepared.outputFormat == prepared.requestedFormat {
|
|
return bytes.Clone(payload)
|
|
}
|
|
originalRequest := prepared.opts.OriginalRequest
|
|
if len(originalRequest) == 0 {
|
|
originalRequest = prepared.req.Payload
|
|
}
|
|
if stream {
|
|
frames := a.translateExecutorStreamPayload(ctx, prepared, payload, param)
|
|
if len(frames) == 0 {
|
|
return nil
|
|
}
|
|
if len(frames) == 1 {
|
|
return bytes.Clone(frames[0])
|
|
}
|
|
return bytes.Join(frames, nil)
|
|
}
|
|
return sdktranslator.TranslateNonStream(ctx, prepared.outputFormat, prepared.requestedFormat, prepared.req.Model, originalRequest, prepared.req.Payload, payload, param)
|
|
}
|
|
|
|
func (a *executorAdapter) translateExecutorStreamChunks(ctx context.Context, prepared preparedExecutorCall, in <-chan pluginapi.ExecutorStreamChunk) <-chan pluginapi.ExecutorStreamChunk {
|
|
if prepared.requestedFormat == "" || prepared.outputFormat == prepared.requestedFormat {
|
|
return in
|
|
}
|
|
if in == nil {
|
|
return nil
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
out := make(chan pluginapi.ExecutorStreamChunk)
|
|
go func() {
|
|
defer close(out)
|
|
var param any
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case chunk, ok := <-in:
|
|
if !ok {
|
|
a.emitTranslatedExecutorStreamTail(ctx, prepared, out, ¶m)
|
|
return
|
|
}
|
|
if chunk.Err != nil {
|
|
_ = sendExecutorPluginStreamChunk(ctx, out, chunk)
|
|
continue
|
|
}
|
|
frames := a.translateExecutorStreamPayload(ctx, prepared, chunk.Payload, ¶m)
|
|
for _, frame := range frames {
|
|
if !sendExecutorPluginStreamChunk(ctx, out, pluginapi.ExecutorStreamChunk{Payload: frame}) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return out
|
|
}
|
|
|
|
func (a *executorAdapter) translateExecutorStreamPayload(ctx context.Context, prepared preparedExecutorCall, payload []byte, param *any) [][]byte {
|
|
originalRequest := prepared.opts.OriginalRequest
|
|
if len(originalRequest) == 0 {
|
|
originalRequest = prepared.req.Payload
|
|
}
|
|
frames := sdktranslator.TranslateStream(ctx, prepared.outputFormat, prepared.requestedFormat, prepared.req.Model, originalRequest, prepared.req.Payload, payload, param)
|
|
if executorStreamTranslationFellBack(prepared, payload, frames) {
|
|
return nil
|
|
}
|
|
return frames
|
|
}
|
|
|
|
func executorStreamTranslationFellBack(prepared preparedExecutorCall, payload []byte, frames [][]byte) bool {
|
|
if prepared.requestedFormat == "" || prepared.outputFormat == "" || prepared.outputFormat == prepared.requestedFormat {
|
|
return false
|
|
}
|
|
if len(frames) != 1 || !bytes.Equal(frames[0], payload) {
|
|
return false
|
|
}
|
|
// A plugin executor only reaches this path after host-side response translation
|
|
// has been selected. An unchanged single frame is the SDK registry fallback,
|
|
// not a valid translated frame to send to the client.
|
|
return executorResponseTranslatorExists(prepared.outputFormat, prepared.requestedFormat)
|
|
}
|
|
|
|
func (a *executorAdapter) emitTranslatedExecutorStreamTail(ctx context.Context, prepared preparedExecutorCall, out chan<- pluginapi.ExecutorStreamChunk, param *any) {
|
|
tail := executorStreamDonePayload(prepared.outputFormat)
|
|
if len(tail) == 0 {
|
|
return
|
|
}
|
|
frames := a.translateExecutorStreamPayload(ctx, prepared, tail, param)
|
|
for _, frame := range frames {
|
|
if !sendExecutorPluginStreamChunk(ctx, out, pluginapi.ExecutorStreamChunk{Payload: frame}) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func executorStreamDonePayload(format sdktranslator.Format) []byte {
|
|
switch format {
|
|
case sdktranslator.FormatOpenAI:
|
|
return []byte("data: [DONE]")
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func sendExecutorPluginStreamChunk(ctx context.Context, out chan<- pluginapi.ExecutorStreamChunk, chunk pluginapi.ExecutorStreamChunk) bool {
|
|
select {
|
|
case out <- pluginapi.ExecutorStreamChunk{Payload: bytes.Clone(chunk.Payload), Err: chunk.Err}:
|
|
return true
|
|
case <-ctx.Done():
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (a *executorAdapter) Execute(ctx context.Context, auth *coreauth.Auth, req coreexecutor.Request, opts coreexecutor.Options) (resp coreexecutor.Response, err error) {
|
|
if a == nil || a.executor == nil || a.host.isPluginFused(a.pluginID) {
|
|
return coreexecutor.Response{}, fmt.Errorf("plugin executor %s is unavailable", a.Identifier())
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
a.host.fusePlugin(a.pluginID, "Executor.Execute", recovered)
|
|
resp = coreexecutor.Response{}
|
|
err = fmt.Errorf("plugin executor %s panic: %v", a.Identifier(), recovered)
|
|
}
|
|
}()
|
|
|
|
prepared, errPrepare := a.prepareExecutorCall(req, opts)
|
|
if errPrepare != nil {
|
|
return coreexecutor.Response{}, errPrepare
|
|
}
|
|
pluginResp, errExecute := a.executor.Execute(ctx, buildExecutorRequest(a.host, a.provider, auth, prepared.req, prepared.opts))
|
|
if errExecute != nil {
|
|
return coreexecutor.Response{}, errExecute
|
|
}
|
|
return coreexecutor.Response{
|
|
Payload: a.translateExecutorResponse(ctx, prepared, pluginResp.Payload, false, nil),
|
|
Metadata: cloneAnyMap(pluginResp.Metadata),
|
|
Headers: cloneHeader(pluginResp.Headers),
|
|
}, nil
|
|
}
|
|
|
|
func (a *executorAdapter) ExecuteStream(ctx context.Context, auth *coreauth.Auth, req coreexecutor.Request, opts coreexecutor.Options) (result *coreexecutor.StreamResult, err error) {
|
|
if a == nil || a.executor == nil || a.host.isPluginFused(a.pluginID) {
|
|
return nil, fmt.Errorf("plugin executor %s is unavailable", a.Identifier())
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
a.host.fusePlugin(a.pluginID, "Executor.ExecuteStream", recovered)
|
|
result = nil
|
|
err = fmt.Errorf("plugin executor %s stream panic: %v", a.Identifier(), recovered)
|
|
}
|
|
}()
|
|
|
|
prepared, errPrepare := a.prepareExecutorCall(req, opts)
|
|
if errPrepare != nil {
|
|
return nil, errPrepare
|
|
}
|
|
pluginResp, errExecuteStream := a.executor.ExecuteStream(ctx, buildExecutorRequest(a.host, a.provider, auth, prepared.req, prepared.opts))
|
|
if errExecuteStream != nil {
|
|
return nil, errExecuteStream
|
|
}
|
|
return &coreexecutor.StreamResult{
|
|
Headers: cloneHeader(pluginResp.Headers),
|
|
Chunks: mapExecutorStreamChunks(ctx, a.translateExecutorStreamChunks(ctx, prepared, pluginResp.Chunks)),
|
|
}, nil
|
|
}
|
|
|
|
func (a *executorAdapter) Refresh(ctx context.Context, auth *coreauth.Auth) (refreshed *coreauth.Auth, err error) {
|
|
if a == nil || a.executor == nil || a.host.isPluginFused(a.pluginID) {
|
|
return nil, fmt.Errorf("plugin executor %s is unavailable", a.Identifier())
|
|
}
|
|
record := a.host.authProviderRecord(authProvider(auth))
|
|
if record == nil || record.plugin.Capabilities.AuthProvider == nil {
|
|
return auth.Clone(), nil
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
a.host.fusePlugin(record.id, "AuthProvider.RefreshAuth", recovered)
|
|
refreshed = nil
|
|
err = fmt.Errorf("plugin executor %s refresh panic: %v", a.Identifier(), recovered)
|
|
}
|
|
}()
|
|
|
|
pluginResp, errRefresh := record.plugin.Capabilities.AuthProvider.RefreshAuth(ctx, pluginapi.AuthRefreshRequest{
|
|
AuthID: authID(auth),
|
|
AuthProvider: authProvider(auth),
|
|
StorageJSON: storageJSONFromAuth(auth),
|
|
Metadata: cloneAnyMap(authMetadata(auth)),
|
|
Attributes: authAttributes(auth),
|
|
Host: a.host.hostConfigSummary(),
|
|
HTTPClient: a.host.newHTTPClient(auth),
|
|
})
|
|
if errRefresh != nil {
|
|
return nil, errRefresh
|
|
}
|
|
data := pluginResp.Auth
|
|
if strings.TrimSpace(data.Provider) == "" {
|
|
data.Provider = authProvider(auth)
|
|
}
|
|
if strings.TrimSpace(data.ID) == "" {
|
|
data.ID = authID(auth)
|
|
}
|
|
if strings.TrimSpace(data.FileName) == "" && auth != nil {
|
|
data.FileName = auth.FileName
|
|
}
|
|
if strings.TrimSpace(data.Label) == "" && auth != nil {
|
|
data.Label = auth.Label
|
|
}
|
|
if strings.TrimSpace(data.Prefix) == "" && auth != nil {
|
|
data.Prefix = auth.Prefix
|
|
}
|
|
if strings.TrimSpace(data.ProxyURL) == "" && auth != nil {
|
|
data.ProxyURL = auth.ProxyURL
|
|
}
|
|
if len(data.Metadata) == 0 && auth != nil {
|
|
data.Metadata = cloneAnyMap(auth.Metadata)
|
|
}
|
|
if len(data.Attributes) == 0 && auth != nil {
|
|
data.Attributes = cloneStringMap(auth.Attributes)
|
|
}
|
|
if len(data.StorageJSON) == 0 {
|
|
data.StorageJSON = storageJSONFromAuth(auth)
|
|
}
|
|
if pluginResp.NextRefreshAfter.IsZero() && auth != nil {
|
|
data.NextRefreshAfter = auth.NextRefreshAfter
|
|
}
|
|
if !pluginResp.NextRefreshAfter.IsZero() {
|
|
data.NextRefreshAfter = pluginResp.NextRefreshAfter
|
|
}
|
|
next := a.host.AuthDataToCoreAuth(data, "", data.FileName)
|
|
if next == nil {
|
|
return nil, fmt.Errorf("plugin executor %s refresh returned invalid auth data", a.Identifier())
|
|
}
|
|
if auth != nil {
|
|
next.CreatedAt = auth.CreatedAt
|
|
next.UpdatedAt = auth.UpdatedAt
|
|
}
|
|
return next, nil
|
|
}
|
|
|
|
func (a *executorAdapter) CountTokens(ctx context.Context, auth *coreauth.Auth, req coreexecutor.Request, opts coreexecutor.Options) (resp coreexecutor.Response, err error) {
|
|
if a == nil || a.executor == nil || a.host.isPluginFused(a.pluginID) {
|
|
return coreexecutor.Response{}, fmt.Errorf("plugin executor %s is unavailable", a.Identifier())
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
a.host.fusePlugin(a.pluginID, "Executor.CountTokens", recovered)
|
|
resp = coreexecutor.Response{}
|
|
err = fmt.Errorf("plugin executor %s count tokens panic: %v", a.Identifier(), recovered)
|
|
}
|
|
}()
|
|
|
|
prepared, errPrepare := a.prepareExecutorCall(req, opts)
|
|
if errPrepare != nil {
|
|
return coreexecutor.Response{}, errPrepare
|
|
}
|
|
pluginResp, errCountTokens := a.executor.CountTokens(ctx, buildExecutorRequest(a.host, a.provider, auth, prepared.req, prepared.opts))
|
|
if errCountTokens != nil {
|
|
return coreexecutor.Response{}, errCountTokens
|
|
}
|
|
return coreexecutor.Response{
|
|
Payload: a.translateExecutorResponse(ctx, prepared, pluginResp.Payload, false, nil),
|
|
Metadata: cloneAnyMap(pluginResp.Metadata),
|
|
Headers: cloneHeader(pluginResp.Headers),
|
|
}, nil
|
|
}
|
|
|
|
func (a *executorAdapter) HttpRequest(ctx context.Context, auth *coreauth.Auth, req *http.Request) (resp *http.Response, err error) {
|
|
if a == nil || a.executor == nil || a.host.isPluginFused(a.pluginID) {
|
|
return nil, fmt.Errorf("plugin executor %s is unavailable", a.Identifier())
|
|
}
|
|
if req == nil {
|
|
return nil, fmt.Errorf("plugin executor %s received nil HTTP request", a.Identifier())
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
a.host.fusePlugin(a.pluginID, "Executor.HttpRequest", recovered)
|
|
resp = nil
|
|
err = fmt.Errorf("plugin executor %s http request panic: %v", a.Identifier(), recovered)
|
|
}
|
|
}()
|
|
body, errReadAll := readAndRestoreRequestBody(req)
|
|
if errReadAll != nil {
|
|
return nil, fmt.Errorf("read plugin http request body: %w", errReadAll)
|
|
}
|
|
pluginResp, errHTTPRequest := a.executor.HttpRequest(ctx, pluginapi.ExecutorHTTPRequest{
|
|
AuthID: authID(auth),
|
|
AuthProvider: authProvider(auth),
|
|
Method: req.Method,
|
|
URL: req.URL.String(),
|
|
Headers: cloneHeader(req.Header),
|
|
Body: bytes.Clone(body),
|
|
StorageJSON: storageJSONFromAuth(auth),
|
|
Metadata: cloneAnyMap(authMetadata(auth)),
|
|
Attributes: authAttributes(auth),
|
|
HTTPClient: a.host.newHTTPClient(auth, a.provider),
|
|
})
|
|
if errHTTPRequest != nil {
|
|
return nil, errHTTPRequest
|
|
}
|
|
status := pluginResp.StatusCode
|
|
if status == 0 {
|
|
status = http.StatusOK
|
|
}
|
|
resp = &http.Response{
|
|
StatusCode: status,
|
|
Status: fmt.Sprintf("%d %s", status, http.StatusText(status)),
|
|
Header: cloneHeader(pluginResp.Headers),
|
|
Body: io.NopCloser(bytes.NewReader(bytes.Clone(pluginResp.Body))),
|
|
Request: req,
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
type usageAdapter struct {
|
|
host *Host
|
|
pluginID string
|
|
plugin pluginapi.UsagePlugin
|
|
}
|
|
|
|
type thinkingAdapter struct {
|
|
host *Host
|
|
pluginID string
|
|
provider string
|
|
applier pluginapi.ThinkingApplier
|
|
}
|
|
|
|
func (a *usageAdapter) HandleUsage(ctx context.Context, record coreusage.Record) {
|
|
if a == nil {
|
|
return
|
|
}
|
|
plugin := a.host.currentUsagePlugin(a.pluginID)
|
|
if plugin == nil {
|
|
return
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
a.host.fusePlugin(a.pluginID, "UsagePlugin.HandleUsage", recovered)
|
|
}
|
|
}()
|
|
plugin.HandleUsage(ctx, pluginapi.UsageRecord{
|
|
Provider: record.Provider,
|
|
ExecutorType: record.ExecutorType,
|
|
Model: record.Model,
|
|
Alias: record.Alias,
|
|
APIKey: record.APIKey,
|
|
AuthID: record.AuthID,
|
|
AuthIndex: record.AuthIndex,
|
|
AuthType: record.AuthType,
|
|
Source: record.Source,
|
|
ReasoningEffort: record.ReasoningEffort,
|
|
ServiceTier: record.ServiceTier,
|
|
RequestedAt: record.RequestedAt,
|
|
Latency: record.Latency,
|
|
TTFT: record.TTFT,
|
|
Failed: record.Failed,
|
|
Failure: pluginapi.UsageFailure{
|
|
StatusCode: record.Fail.StatusCode,
|
|
Body: record.Fail.Body,
|
|
},
|
|
Detail: pluginapi.UsageDetail{
|
|
InputTokens: record.Detail.InputTokens,
|
|
OutputTokens: record.Detail.OutputTokens,
|
|
ReasoningTokens: record.Detail.ReasoningTokens,
|
|
CachedTokens: record.Detail.CachedTokens,
|
|
CacheReadTokens: record.Detail.CacheReadTokens,
|
|
CacheCreationTokens: record.Detail.CacheCreationTokens,
|
|
TotalTokens: record.Detail.TotalTokens,
|
|
},
|
|
ResponseHeaders: cloneHeader(record.ResponseHeaders),
|
|
})
|
|
}
|
|
|
|
func (a *thinkingAdapter) Apply(body []byte, config thinking.ThinkingConfig, modelInfo *registry.ModelInfo) (out []byte, err error) {
|
|
if a == nil || a.applier == nil || a.host == nil || a.host.isPluginFused(a.pluginID) {
|
|
return bytes.Clone(body), nil
|
|
}
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
a.host.fusePlugin(a.pluginID, "ThinkingApplier.ApplyThinking", recovered)
|
|
out = bytes.Clone(body)
|
|
err = nil
|
|
}
|
|
}()
|
|
resp, errApply := a.applier.ApplyThinking(context.Background(), pluginapi.ThinkingApplyRequest{
|
|
Provider: a.provider,
|
|
Model: registryModelInfoToPluginModelInfo(modelInfo),
|
|
Config: pluginapi.ThinkingConfig{
|
|
Mode: config.Mode.String(),
|
|
Budget: config.Budget,
|
|
Level: string(config.Level),
|
|
},
|
|
Body: bytes.Clone(body),
|
|
})
|
|
if errApply != nil || len(resp.Body) == 0 {
|
|
return bytes.Clone(body), nil
|
|
}
|
|
return bytes.Clone(resp.Body), nil
|
|
}
|
|
|
|
func (h *Host) NormalizeRequest(ctx context.Context, from, to sdktranslator.Format, model string, body []byte, stream bool) []byte {
|
|
current := bytes.Clone(body)
|
|
for _, record := range h.Snapshot().records {
|
|
if h.isPluginFused(record.id) || record.plugin.Capabilities.RequestNormalizer == nil {
|
|
continue
|
|
}
|
|
if normalized, ok := h.callRequestNormalizer(ctx, record, from, to, model, current, stream); ok {
|
|
current = normalized
|
|
}
|
|
}
|
|
return current
|
|
}
|
|
|
|
func (h *Host) TranslateRequest(ctx context.Context, from, to sdktranslator.Format, model string, body []byte, stream bool) ([]byte, bool) {
|
|
for _, record := range h.Snapshot().records {
|
|
if h.isPluginFused(record.id) || record.plugin.Capabilities.RequestTranslator == nil {
|
|
continue
|
|
}
|
|
if translated, ok := h.callRequestTranslator(ctx, record, from, to, model, body, stream); ok {
|
|
return translated, true
|
|
}
|
|
}
|
|
return bytes.Clone(body), false
|
|
}
|
|
|
|
func (h *Host) NormalizeResponseBefore(ctx context.Context, from, to sdktranslator.Format, model string, originalRequestRawJSON, requestRawJSON, body []byte, stream bool) []byte {
|
|
current := bytes.Clone(body)
|
|
for _, record := range h.Snapshot().records {
|
|
normalizer := record.plugin.Capabilities.ResponseBeforeTranslator
|
|
if h.isPluginFused(record.id) || normalizer == nil {
|
|
continue
|
|
}
|
|
if normalized, ok := h.callResponseNormalizer(ctx, record.id, "ResponseBeforeTranslator.NormalizeResponse", normalizer, from, to, model, originalRequestRawJSON, requestRawJSON, current, stream); ok {
|
|
current = normalized
|
|
}
|
|
}
|
|
return current
|
|
}
|
|
|
|
func (h *Host) TranslateResponse(ctx context.Context, from, to sdktranslator.Format, model string, originalRequestRawJSON, requestRawJSON, body []byte, stream bool) ([]byte, bool) {
|
|
for _, record := range h.Snapshot().records {
|
|
translator := record.plugin.Capabilities.ResponseTranslator
|
|
if h.isPluginFused(record.id) || translator == nil {
|
|
continue
|
|
}
|
|
if translated, ok := h.callResponseTranslator(ctx, record.id, translator, from, to, model, originalRequestRawJSON, requestRawJSON, body, stream); ok {
|
|
return translated, true
|
|
}
|
|
}
|
|
return bytes.Clone(body), false
|
|
}
|
|
|
|
func (h *Host) NormalizeResponseAfter(ctx context.Context, from, to sdktranslator.Format, model string, originalRequestRawJSON, requestRawJSON, body []byte, stream bool) []byte {
|
|
current := bytes.Clone(body)
|
|
for _, record := range h.Snapshot().records {
|
|
normalizer := record.plugin.Capabilities.ResponseAfterTranslator
|
|
if h.isPluginFused(record.id) || normalizer == nil {
|
|
continue
|
|
}
|
|
if normalized, ok := h.callResponseNormalizer(ctx, record.id, "ResponseAfterTranslator.NormalizeResponse", normalizer, from, to, model, originalRequestRawJSON, requestRawJSON, current, stream); ok {
|
|
current = normalized
|
|
}
|
|
}
|
|
return current
|
|
}
|
|
|
|
func (h *Host) callRequestNormalizer(ctx context.Context, record capabilityRecord, from, to sdktranslator.Format, model string, body []byte, stream bool) (out []byte, ok bool) {
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
h.fusePlugin(record.id, "RequestNormalizer.NormalizeRequest", recovered)
|
|
out = nil
|
|
ok = false
|
|
}
|
|
}()
|
|
resp, errNormalizeRequest := record.plugin.Capabilities.RequestNormalizer.NormalizeRequest(ctx, pluginapi.RequestTransformRequest{
|
|
FromFormat: from.String(),
|
|
ToFormat: to.String(),
|
|
Model: model,
|
|
Stream: stream,
|
|
Body: bytes.Clone(body),
|
|
})
|
|
if errNormalizeRequest != nil || len(resp.Body) == 0 {
|
|
return nil, false
|
|
}
|
|
return bytes.Clone(resp.Body), true
|
|
}
|
|
|
|
func (h *Host) callRequestTranslator(ctx context.Context, record capabilityRecord, from, to sdktranslator.Format, model string, body []byte, stream bool) (out []byte, ok bool) {
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
h.fusePlugin(record.id, "RequestTranslator.TranslateRequest", recovered)
|
|
out = nil
|
|
ok = false
|
|
}
|
|
}()
|
|
resp, errTranslateRequest := record.plugin.Capabilities.RequestTranslator.TranslateRequest(ctx, pluginapi.RequestTransformRequest{
|
|
FromFormat: from.String(),
|
|
ToFormat: to.String(),
|
|
Model: model,
|
|
Stream: stream,
|
|
Body: bytes.Clone(body),
|
|
})
|
|
if errTranslateRequest != nil || len(resp.Body) == 0 {
|
|
return nil, false
|
|
}
|
|
return bytes.Clone(resp.Body), true
|
|
}
|
|
|
|
func (h *Host) callResponseNormalizer(ctx context.Context, pluginID, method string, normalizer pluginapi.ResponseNormalizer, from, to sdktranslator.Format, model string, originalRequestRawJSON, requestRawJSON, body []byte, stream bool) (out []byte, ok bool) {
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
h.fusePlugin(pluginID, method, recovered)
|
|
out = nil
|
|
ok = false
|
|
}
|
|
}()
|
|
resp, errNormalizeResponse := normalizer.NormalizeResponse(ctx, pluginapi.ResponseTransformRequest{
|
|
FromFormat: from.String(),
|
|
ToFormat: to.String(),
|
|
Model: model,
|
|
Stream: stream,
|
|
OriginalRequest: bytes.Clone(originalRequestRawJSON),
|
|
TranslatedRequest: bytes.Clone(requestRawJSON),
|
|
Body: bytes.Clone(body),
|
|
})
|
|
if errNormalizeResponse != nil || len(resp.Body) == 0 {
|
|
return nil, false
|
|
}
|
|
return bytes.Clone(resp.Body), true
|
|
}
|
|
|
|
func (h *Host) callResponseTranslator(ctx context.Context, pluginID string, translator pluginapi.ResponseTranslator, from, to sdktranslator.Format, model string, originalRequestRawJSON, requestRawJSON, body []byte, stream bool) (out []byte, ok bool) {
|
|
defer func() {
|
|
if recovered := recover(); recovered != nil {
|
|
h.fusePlugin(pluginID, "ResponseTranslator.TranslateResponse", recovered)
|
|
out = nil
|
|
ok = false
|
|
}
|
|
}()
|
|
resp, errTranslateResponse := translator.TranslateResponse(ctx, pluginapi.ResponseTransformRequest{
|
|
FromFormat: from.String(),
|
|
ToFormat: to.String(),
|
|
Model: model,
|
|
Stream: stream,
|
|
OriginalRequest: bytes.Clone(originalRequestRawJSON),
|
|
TranslatedRequest: bytes.Clone(requestRawJSON),
|
|
Body: bytes.Clone(body),
|
|
})
|
|
if errTranslateResponse != nil || len(resp.Body) == 0 {
|
|
return nil, false
|
|
}
|
|
return bytes.Clone(resp.Body), true
|
|
}
|
|
|
|
func buildExecutorRequest(host *Host, provider string, auth *coreauth.Auth, req coreexecutor.Request, opts coreexecutor.Options) pluginapi.ExecutorRequest {
|
|
return pluginapi.ExecutorRequest{
|
|
AuthID: authID(auth),
|
|
AuthProvider: authProvider(auth),
|
|
Model: req.Model,
|
|
Format: req.Format.String(),
|
|
Stream: opts.Stream,
|
|
Alt: opts.Alt,
|
|
Headers: cloneHeader(opts.Headers),
|
|
Query: cloneValues(opts.Query),
|
|
OriginalRequest: bytes.Clone(opts.OriginalRequest),
|
|
SourceFormat: opts.SourceFormat.String(),
|
|
Payload: bytes.Clone(req.Payload),
|
|
Metadata: mergeExecutorMetadata(req.Metadata, opts.Metadata),
|
|
StorageJSON: storageJSONFromAuth(auth),
|
|
AuthMetadata: cloneAnyMap(authMetadata(auth)),
|
|
AuthAttributes: authAttributes(auth),
|
|
HTTPClient: host.newHTTPClient(auth, provider),
|
|
}
|
|
}
|
|
|
|
func storageJSONFromAuth(auth *coreauth.Auth) []byte {
|
|
if auth == nil {
|
|
return nil
|
|
}
|
|
if rawProvider, okRaw := auth.Storage.(interface{ RawJSON() []byte }); okRaw {
|
|
return bytes.Clone(rawProvider.RawJSON())
|
|
}
|
|
if len(auth.Metadata) == 0 {
|
|
return nil
|
|
}
|
|
data, errMarshal := json.Marshal(auth.Metadata)
|
|
if errMarshal != nil {
|
|
return nil
|
|
}
|
|
return data
|
|
}
|
|
|
|
func authAttributes(auth *coreauth.Auth) map[string]string {
|
|
if auth == nil {
|
|
return nil
|
|
}
|
|
return cloneStringMap(auth.Attributes)
|
|
}
|
|
|
|
func mergeExecutorMetadata(reqMetadata, optsMetadata map[string]any) map[string]any {
|
|
if len(reqMetadata) == 0 && len(optsMetadata) == 0 {
|
|
return nil
|
|
}
|
|
merged := make(map[string]any, len(reqMetadata)+len(optsMetadata))
|
|
for key, value := range reqMetadata {
|
|
merged[key] = value
|
|
}
|
|
for key, value := range optsMetadata {
|
|
merged[key] = value
|
|
}
|
|
return merged
|
|
}
|
|
|
|
func mapExecutorStreamChunks(ctx context.Context, in <-chan pluginapi.ExecutorStreamChunk) <-chan coreexecutor.StreamChunk {
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
out := make(chan coreexecutor.StreamChunk)
|
|
if in == nil {
|
|
close(out)
|
|
return out
|
|
}
|
|
go func() {
|
|
defer close(out)
|
|
for {
|
|
var mapped coreexecutor.StreamChunk
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case chunk, ok := <-in:
|
|
if !ok {
|
|
return
|
|
}
|
|
mapped = coreexecutor.StreamChunk{
|
|
Payload: bytes.Clone(chunk.Payload),
|
|
Err: chunk.Err,
|
|
}
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case out <- mapped:
|
|
}
|
|
}
|
|
}()
|
|
return out
|
|
}
|
|
|
|
func readAndRestoreRequestBody(r *http.Request) ([]byte, error) {
|
|
if r == nil || r.Body == nil {
|
|
return nil, nil
|
|
}
|
|
body, errReadAll := io.ReadAll(r.Body)
|
|
if errReadAll != nil {
|
|
r.Body = io.NopCloser(bytes.NewReader(body))
|
|
return nil, errReadAll
|
|
}
|
|
r.Body = io.NopCloser(bytes.NewReader(body))
|
|
return body, nil
|
|
}
|
|
|
|
func authID(auth *coreauth.Auth) string {
|
|
if auth == nil {
|
|
return ""
|
|
}
|
|
return auth.ID
|
|
}
|
|
|
|
func authProvider(auth *coreauth.Auth) string {
|
|
if auth == nil {
|
|
return ""
|
|
}
|
|
return auth.Provider
|
|
}
|
|
|
|
func authMetadata(auth *coreauth.Auth) map[string]any {
|
|
if auth == nil {
|
|
return nil
|
|
}
|
|
return auth.Metadata
|
|
}
|
|
|
|
func cloneHeader(in http.Header) http.Header {
|
|
if len(in) == 0 {
|
|
return nil
|
|
}
|
|
out := make(http.Header, len(in))
|
|
for key, values := range in {
|
|
out[key] = append([]string(nil), values...)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func mergeHeaders(current, updates http.Header, clear []string) http.Header {
|
|
out := cloneHeader(current)
|
|
if out == nil {
|
|
out = make(http.Header)
|
|
}
|
|
for _, key := range clear {
|
|
out.Del(key)
|
|
}
|
|
for key, values := range updates {
|
|
out.Del(key)
|
|
for _, value := range values {
|
|
out.Add(key, value)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func cloneByteSlices(in [][]byte) [][]byte {
|
|
if len(in) == 0 {
|
|
return nil
|
|
}
|
|
out := make([][]byte, 0, len(in))
|
|
for _, item := range in {
|
|
out = append(out, bytes.Clone(item))
|
|
}
|
|
return out
|
|
}
|
|
|
|
func cloneValues(in url.Values) url.Values {
|
|
if len(in) == 0 {
|
|
return nil
|
|
}
|
|
out := make(url.Values, len(in))
|
|
for key, values := range in {
|
|
out[key] = append([]string(nil), values...)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func cloneAnyMap(in map[string]any) map[string]any {
|
|
if len(in) == 0 {
|
|
return nil
|
|
}
|
|
out := make(map[string]any, len(in))
|
|
for key, value := range in {
|
|
out[key] = value
|
|
}
|
|
return out
|
|
}
|
|
|
|
func cloneInterceptorMetadata(in map[string]any) map[string]any {
|
|
if len(in) == 0 {
|
|
return nil
|
|
}
|
|
visited := make(map[metadataCloneVisit]reflect.Value)
|
|
out := make(map[string]any, len(in))
|
|
for key, value := range in {
|
|
out[key] = cloneInterceptorMetadataAny(reflect.ValueOf(value), visited)
|
|
}
|
|
return out
|
|
}
|
|
|
|
type metadataCloneVisit struct {
|
|
typ reflect.Type
|
|
ptr uintptr
|
|
}
|
|
|
|
func cloneInterceptorMetadataAny(value reflect.Value, visited map[metadataCloneVisit]reflect.Value) any {
|
|
cloned := cloneInterceptorMetadataReflectValue(value, visited)
|
|
if !cloned.IsValid() {
|
|
return nil
|
|
}
|
|
return cloned.Interface()
|
|
}
|
|
|
|
func cloneInterceptorMetadataReflectValue(value reflect.Value, visited map[metadataCloneVisit]reflect.Value) reflect.Value {
|
|
if !value.IsValid() {
|
|
return reflect.Value{}
|
|
}
|
|
|
|
switch value.Kind() {
|
|
case reflect.Interface:
|
|
if value.IsNil() {
|
|
return reflect.Zero(value.Type())
|
|
}
|
|
return cloneInterceptorMetadataReflectValue(value.Elem(), visited)
|
|
case reflect.Pointer:
|
|
if value.IsNil() {
|
|
return reflect.Zero(value.Type())
|
|
}
|
|
visit := metadataCloneVisit{typ: value.Type(), ptr: value.Pointer()}
|
|
if existing, okExisting := visited[visit]; okExisting {
|
|
return existing
|
|
}
|
|
out := reflect.New(value.Type().Elem())
|
|
visited[visit] = out
|
|
clonedElem := cloneInterceptorMetadataReflectValue(value.Elem(), visited)
|
|
if clonedElem.IsValid() {
|
|
outElem := out.Elem()
|
|
if clonedElem.Type().AssignableTo(outElem.Type()) {
|
|
outElem.Set(clonedElem)
|
|
} else if clonedElem.Type().ConvertibleTo(outElem.Type()) {
|
|
outElem.Set(clonedElem.Convert(outElem.Type()))
|
|
}
|
|
}
|
|
return out
|
|
case reflect.Map:
|
|
if value.IsNil() {
|
|
return reflect.Zero(value.Type())
|
|
}
|
|
visit := metadataCloneVisit{typ: value.Type(), ptr: value.Pointer()}
|
|
if existing, okExisting := visited[visit]; okExisting {
|
|
return existing
|
|
}
|
|
out := reflect.MakeMapWithSize(value.Type(), value.Len())
|
|
visited[visit] = out
|
|
iter := value.MapRange()
|
|
for iter.Next() {
|
|
keyValue := adaptClonedValue(iter.Key(), cloneInterceptorMetadataReflectValue(iter.Key(), visited))
|
|
valValue := adaptClonedValue(iter.Value(), cloneInterceptorMetadataReflectValue(iter.Value(), visited))
|
|
out.SetMapIndex(keyValue, valValue)
|
|
}
|
|
return out
|
|
case reflect.Slice:
|
|
if value.IsNil() {
|
|
return reflect.Zero(value.Type())
|
|
}
|
|
if value.Type().Elem().Kind() == reflect.Uint8 {
|
|
out := reflect.MakeSlice(value.Type(), value.Len(), value.Len())
|
|
reflect.Copy(out, value)
|
|
return out
|
|
}
|
|
visit := metadataCloneVisit{typ: value.Type(), ptr: value.Pointer()}
|
|
if existing, okExisting := visited[visit]; okExisting {
|
|
return existing
|
|
}
|
|
out := reflect.MakeSlice(value.Type(), value.Len(), value.Len())
|
|
visited[visit] = out
|
|
for i := 0; i < value.Len(); i++ {
|
|
clonedItem := cloneInterceptorMetadataReflectValue(value.Index(i), visited)
|
|
if !clonedItem.IsValid() {
|
|
continue
|
|
}
|
|
out.Index(i).Set(adaptClonedValue(value.Index(i), clonedItem))
|
|
}
|
|
return out
|
|
case reflect.Array:
|
|
out := reflect.New(value.Type()).Elem()
|
|
for i := 0; i < value.Len(); i++ {
|
|
clonedItem := cloneInterceptorMetadataReflectValue(value.Index(i), visited)
|
|
if !clonedItem.IsValid() {
|
|
continue
|
|
}
|
|
out.Index(i).Set(adaptClonedValue(value.Index(i), clonedItem))
|
|
}
|
|
return out
|
|
case reflect.Struct:
|
|
out := reflect.New(value.Type()).Elem()
|
|
// Preserve unexported fields and deep-clone exported fields on a best-effort basis.
|
|
out.Set(value)
|
|
for i := 0; i < value.NumField(); i++ {
|
|
field := value.Field(i)
|
|
if !out.Field(i).CanSet() {
|
|
continue
|
|
}
|
|
fieldClone := cloneInterceptorMetadataReflectValue(field, visited)
|
|
if !fieldClone.IsValid() {
|
|
continue
|
|
}
|
|
out.Field(i).Set(adaptClonedValue(field, fieldClone))
|
|
}
|
|
return out
|
|
default:
|
|
return value
|
|
}
|
|
}
|
|
|
|
func adaptClonedValue(original, cloned reflect.Value) reflect.Value {
|
|
if !cloned.IsValid() {
|
|
return original
|
|
}
|
|
if cloned.Type().AssignableTo(original.Type()) {
|
|
return cloned
|
|
}
|
|
if cloned.Type().ConvertibleTo(original.Type()) {
|
|
return cloned.Convert(original.Type())
|
|
}
|
|
return original
|
|
}
|
|
|
|
func cloneStringMap(in map[string]string) map[string]string {
|
|
if len(in) == 0 {
|
|
return nil
|
|
}
|
|
out := make(map[string]string, len(in))
|
|
for key, value := range in {
|
|
out[key] = value
|
|
}
|
|
return out
|
|
}
|