package pluginhost import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "strings" "github.com/router-for-me/CLIProxyAPI/v7/sdk/pluginabi" "github.com/router-for-me/CLIProxyAPI/v7/sdk/pluginapi" ) type rpcPluginAdapter struct { id string host *Host client pluginClient } type rpcAuthProvider struct { *rpcPluginAdapter } type rpcFrontendAuthProvider struct { *rpcPluginAdapter } type rpcProviderExecutor struct { *rpcPluginAdapter } type rpcThinkingApplier struct { *rpcPluginAdapter } type rpcResponseNormalizer struct { *rpcPluginAdapter method string } func registerRPCPlugin(ctx context.Context, host *Host, id string, client pluginClient, method string, configYAML []byte) (pluginapi.Plugin, error) { if client == nil { return pluginapi.Plugin{}, fmt.Errorf("plugin client is nil") } resp, errCall := callPlugin[rpcRegistration](ctx, client, method, rpcLifecycleRequest{ConfigYAML: bytes.Clone(configYAML)}) if errCall != nil { return pluginapi.Plugin{}, errCall } adapter := &rpcPluginAdapter{id: id, host: host, client: client} plugin := pluginapi.Plugin{ Metadata: resp.Metadata, Capabilities: pluginapi.Capabilities{ ExecutorModelScope: resp.Capabilities.ExecutorModelScope, ExecutorInputFormats: append([]string(nil), resp.Capabilities.ExecutorInputFormats...), ExecutorOutputFormats: append([]string(nil), resp.Capabilities.ExecutorOutputFormats...), }, } if resp.Capabilities.ModelRegistrar { plugin.Capabilities.ModelRegistrar = adapter } if resp.Capabilities.ModelProvider { plugin.Capabilities.ModelProvider = adapter } if resp.Capabilities.AuthProvider { plugin.Capabilities.AuthProvider = rpcAuthProvider{rpcPluginAdapter: adapter} } if resp.Capabilities.FrontendAuthProvider { plugin.Capabilities.FrontendAuthProvider = rpcFrontendAuthProvider{rpcPluginAdapter: adapter} } if resp.Capabilities.Executor { plugin.Capabilities.Executor = rpcProviderExecutor{rpcPluginAdapter: adapter} } if resp.Capabilities.RequestTranslator { plugin.Capabilities.RequestTranslator = adapter } if resp.Capabilities.RequestNormalizer { plugin.Capabilities.RequestNormalizer = adapter } if resp.Capabilities.ResponseTranslator { plugin.Capabilities.ResponseTranslator = adapter } if resp.Capabilities.ResponseBeforeTranslator { plugin.Capabilities.ResponseBeforeTranslator = rpcResponseNormalizer{rpcPluginAdapter: adapter, method: pluginabi.MethodResponseNormalizeBefore} } if resp.Capabilities.ResponseAfterTranslator { plugin.Capabilities.ResponseAfterTranslator = rpcResponseNormalizer{rpcPluginAdapter: adapter, method: pluginabi.MethodResponseNormalizeAfter} } if resp.Capabilities.ThinkingApplier { plugin.Capabilities.ThinkingApplier = rpcThinkingApplier{rpcPluginAdapter: adapter} } if resp.Capabilities.UsagePlugin { plugin.Capabilities.UsagePlugin = adapter } if resp.Capabilities.CommandLinePlugin { plugin.Capabilities.CommandLinePlugin = adapter } if resp.Capabilities.ManagementAPI { plugin.Capabilities.ManagementAPI = adapter } return plugin, nil } func callPlugin[T any](ctx context.Context, client pluginClient, method string, request any) (T, error) { var zero T rawRequest, errMarshal := json.Marshal(sanitizePluginRequest(request)) if errMarshal != nil { return zero, fmt.Errorf("marshal plugin request %s: %w", method, errMarshal) } rawResp, errCall := client.Call(ctx, method, rawRequest) if errCall != nil { return zero, errCall } var envelope pluginabi.Envelope if errUnmarshal := json.Unmarshal(rawResp, &envelope); errUnmarshal != nil { return zero, fmt.Errorf("decode plugin envelope %s: %w", method, errUnmarshal) } out, errDecode := decodeEnvelopeResult[T](envelope) if errDecode != nil { return zero, fmt.Errorf("decode plugin result %s: %w", method, errDecode) } return out, nil } func sanitizePluginRequest(request any) any { switch req := request.(type) { case pluginapi.AuthLoginStartRequest: req.HTTPClient = nil return req case pluginapi.AuthLoginPollRequest: req.HTTPClient = nil return req case pluginapi.AuthRefreshRequest: req.HTTPClient = nil return req case pluginapi.AuthModelRequest: req.HTTPClient = nil return req case pluginapi.ExecutorRequest: req.HTTPClient = nil return req case pluginapi.ExecutorHTTPRequest: req.HTTPClient = nil return req case rpcExecutorRequest: req.HTTPClient = nil return req default: return request } } func decodeRPCEnvelope[T any](raw []byte) (T, error) { var zero T var envelope pluginabi.Envelope if errUnmarshal := json.Unmarshal(raw, &envelope); errUnmarshal != nil { return zero, errUnmarshal } return decodeEnvelopeResult[T](envelope) } func decodeEnvelopeResult[T any](envelope pluginabi.Envelope) (T, error) { var zero T if !envelope.OK { if envelope.Error != nil { return zero, fmt.Errorf("%s", envelope.Error.Message) } return zero, fmt.Errorf("plugin call failed") } if len(envelope.Result) == 0 { return zero, nil } var out T if errDecode := json.Unmarshal(envelope.Result, &out); errDecode != nil { return zero, errDecode } return out, nil } func marshalRPCEnvelope(result json.RawMessage) ([]byte, error) { if result == nil { result = json.RawMessage(`{}`) } return json.Marshal(pluginabi.Envelope{OK: true, Result: result}) } func marshalRPCError(code, message string) []byte { raw, _ := json.Marshal(pluginabi.Envelope{ OK: false, Error: &pluginabi.Error{ Code: code, Message: message, }, }) return raw } func (a *rpcPluginAdapter) openHostCallbackContext(ctx context.Context) (string, func()) { if a == nil || a.host == nil { return "", func() {} } return a.host.openCallbackContext(ctx) } func (a *rpcPluginAdapter) RegisterModels(ctx context.Context, req pluginapi.ModelRegistrationRequest) (pluginapi.ModelRegistrationResponse, error) { return callPlugin[pluginapi.ModelRegistrationResponse](ctx, a.client, pluginabi.MethodModelRegister, req) } func (a *rpcPluginAdapter) StaticModels(ctx context.Context, req pluginapi.StaticModelRequest) (pluginapi.ModelResponse, error) { return callPlugin[pluginapi.ModelResponse](ctx, a.client, pluginabi.MethodModelStatic, req) } func (a *rpcPluginAdapter) ModelsForAuth(ctx context.Context, req pluginapi.AuthModelRequest) (pluginapi.ModelResponse, error) { callbackID, closeCallback := a.openHostCallbackContext(ctx) defer closeCallback() return callPlugin[pluginapi.ModelResponse](ctx, a.client, pluginabi.MethodModelForAuth, rpcAuthModelRequest{ AuthModelRequest: req, HostCallbackID: callbackID, }) } func callPluginIdentifier(client pluginClient, method string) string { resp, errCall := callPlugin[rpcIdentifierResponse](context.Background(), client, method, rpcEmptyResponse{}) if errCall != nil { return "" } return strings.TrimSpace(resp.Identifier) } func (a rpcAuthProvider) Identifier() string { return callPluginIdentifier(a.client, pluginabi.MethodAuthIdentifier) } func (a rpcFrontendAuthProvider) Identifier() string { return callPluginIdentifier(a.client, pluginabi.MethodFrontendAuthIdentifier) } func (a rpcProviderExecutor) Identifier() string { return callPluginIdentifier(a.client, pluginabi.MethodExecutorIdentifier) } func (a rpcThinkingApplier) Identifier() string { return callPluginIdentifier(a.client, pluginabi.MethodThinkingIdentifier) } func (a *rpcPluginAdapter) ParseAuth(ctx context.Context, req pluginapi.AuthParseRequest) (pluginapi.AuthParseResponse, error) { return callPlugin[pluginapi.AuthParseResponse](ctx, a.client, pluginabi.MethodAuthParse, req) } func (a *rpcPluginAdapter) StartLogin(ctx context.Context, req pluginapi.AuthLoginStartRequest) (pluginapi.AuthLoginStartResponse, error) { callbackID, closeCallback := a.openHostCallbackContext(ctx) defer closeCallback() return callPlugin[pluginapi.AuthLoginStartResponse](ctx, a.client, pluginabi.MethodAuthLoginStart, rpcAuthLoginStartRequest{ AuthLoginStartRequest: req, HostCallbackID: callbackID, }) } func (a *rpcPluginAdapter) PollLogin(ctx context.Context, req pluginapi.AuthLoginPollRequest) (pluginapi.AuthLoginPollResponse, error) { callbackID, closeCallback := a.openHostCallbackContext(ctx) defer closeCallback() return callPlugin[pluginapi.AuthLoginPollResponse](ctx, a.client, pluginabi.MethodAuthLoginPoll, rpcAuthLoginPollRequest{ AuthLoginPollRequest: req, HostCallbackID: callbackID, }) } func (a *rpcPluginAdapter) RefreshAuth(ctx context.Context, req pluginapi.AuthRefreshRequest) (pluginapi.AuthRefreshResponse, error) { callbackID, closeCallback := a.openHostCallbackContext(ctx) defer closeCallback() return callPlugin[pluginapi.AuthRefreshResponse](ctx, a.client, pluginabi.MethodAuthRefresh, rpcAuthRefreshRequest{ AuthRefreshRequest: req, HostCallbackID: callbackID, }) } func (a *rpcPluginAdapter) Authenticate(ctx context.Context, req pluginapi.FrontendAuthRequest) (pluginapi.FrontendAuthResponse, error) { return callPlugin[pluginapi.FrontendAuthResponse](ctx, a.client, pluginabi.MethodFrontendAuthAuthenticate, req) } func (a *rpcPluginAdapter) Execute(ctx context.Context, req pluginapi.ExecutorRequest) (pluginapi.ExecutorResponse, error) { callbackID, closeCallback := a.openHostCallbackContext(ctx) defer closeCallback() return callPlugin[pluginapi.ExecutorResponse](ctx, a.client, pluginabi.MethodExecutorExecute, rpcExecutorRequest{ ExecutorRequest: req, HostCallbackID: callbackID, }) } func (a *rpcPluginAdapter) ExecuteStream(ctx context.Context, req pluginapi.ExecutorRequest) (pluginapi.ExecutorStreamResponse, error) { if a == nil || a.host == nil || a.host.streams == nil { return pluginapi.ExecutorStreamResponse{}, fmt.Errorf("plugin stream bridge is unavailable") } streamID, chunks, cleanup := a.host.streams.open(ctx) callbackID, closeCallback := a.openHostCallbackContext(ctx) defer closeCallback() rpcReq := rpcExecutorRequest{ ExecutorRequest: req, StreamID: streamID, HostCallbackID: callbackID, } resp, errCall := callPlugin[rpcExecutorStreamResponse](ctx, a.client, pluginabi.MethodExecutorExecuteStream, rpcReq) if errCall != nil { cleanup() return pluginapi.ExecutorStreamResponse{}, errCall } if len(resp.Chunks) > 0 { cleanup() out := make(chan pluginapi.ExecutorStreamChunk, len(resp.Chunks)) for _, chunk := range resp.Chunks { out <- chunk } close(out) return pluginapi.ExecutorStreamResponse{Headers: resp.Headers, Chunks: out}, nil } return pluginapi.ExecutorStreamResponse{Headers: resp.Headers, Chunks: chunks}, nil } func (a *rpcPluginAdapter) CountTokens(ctx context.Context, req pluginapi.ExecutorRequest) (pluginapi.ExecutorResponse, error) { callbackID, closeCallback := a.openHostCallbackContext(ctx) defer closeCallback() return callPlugin[pluginapi.ExecutorResponse](ctx, a.client, pluginabi.MethodExecutorCountTokens, rpcExecutorRequest{ ExecutorRequest: req, HostCallbackID: callbackID, }) } func (a *rpcPluginAdapter) HttpRequest(ctx context.Context, req pluginapi.ExecutorHTTPRequest) (pluginapi.ExecutorHTTPResponse, error) { callbackID, closeCallback := a.openHostCallbackContext(ctx) defer closeCallback() return callPlugin[pluginapi.ExecutorHTTPResponse](ctx, a.client, pluginabi.MethodExecutorHTTPRequest, rpcExecutorHTTPRequest{ ExecutorHTTPRequest: req, HostCallbackID: callbackID, }) } func (a *rpcPluginAdapter) TranslateRequest(ctx context.Context, req pluginapi.RequestTransformRequest) (pluginapi.PayloadResponse, error) { return callPlugin[pluginapi.PayloadResponse](ctx, a.client, pluginabi.MethodRequestTranslate, req) } func (a *rpcPluginAdapter) NormalizeRequest(ctx context.Context, req pluginapi.RequestTransformRequest) (pluginapi.PayloadResponse, error) { return callPlugin[pluginapi.PayloadResponse](ctx, a.client, pluginabi.MethodRequestNormalize, req) } func (a *rpcPluginAdapter) TranslateResponse(ctx context.Context, req pluginapi.ResponseTransformRequest) (pluginapi.PayloadResponse, error) { return callPlugin[pluginapi.PayloadResponse](ctx, a.client, pluginabi.MethodResponseTranslate, req) } func (a rpcResponseNormalizer) NormalizeResponse(ctx context.Context, req pluginapi.ResponseTransformRequest) (pluginapi.PayloadResponse, error) { return callPlugin[pluginapi.PayloadResponse](ctx, a.client, a.method, req) } func (a rpcThinkingApplier) ApplyThinking(ctx context.Context, req pluginapi.ThinkingApplyRequest) (pluginapi.PayloadResponse, error) { callbackID, closeCallback := a.openHostCallbackContext(ctx) defer closeCallback() return callPlugin[pluginapi.PayloadResponse](ctx, a.client, pluginabi.MethodThinkingApply, rpcThinkingApplyRequest{ ThinkingApplyRequest: req, HostCallbackID: callbackID, }) } func (a *rpcPluginAdapter) HandleUsage(ctx context.Context, record pluginapi.UsageRecord) { _, _ = callPlugin[rpcEmptyResponse](ctx, a.client, pluginabi.MethodUsageHandle, record) } func (a *rpcPluginAdapter) RegisterCommandLine(ctx context.Context, req pluginapi.CommandLineRegistrationRequest) (pluginapi.CommandLineRegistrationResponse, error) { return callPlugin[pluginapi.CommandLineRegistrationResponse](ctx, a.client, pluginabi.MethodCommandLineRegister, req) } func (a *rpcPluginAdapter) ExecuteCommandLine(ctx context.Context, req pluginapi.CommandLineExecutionRequest) (pluginapi.CommandLineExecutionResponse, error) { return callPlugin[pluginapi.CommandLineExecutionResponse](ctx, a.client, pluginabi.MethodCommandLineExecute, req) } func (a *rpcPluginAdapter) RegisterManagement(ctx context.Context, req pluginapi.ManagementRegistrationRequest) (pluginapi.ManagementRegistrationResponse, error) { resp, errCall := callPlugin[rpcManagementRegistrationResponse](ctx, a.client, pluginabi.MethodManagementRegister, req) if errCall != nil { return pluginapi.ManagementRegistrationResponse{}, errCall } routes := make([]pluginapi.ManagementRoute, 0, len(resp.Routes)) for _, route := range resp.Routes { route.Handler = a routes = append(routes, route) } return pluginapi.ManagementRegistrationResponse{Routes: routes}, nil } func (a *rpcPluginAdapter) HandleManagement(ctx context.Context, req pluginapi.ManagementRequest) (pluginapi.ManagementResponse, error) { return callPlugin[pluginapi.ManagementResponse](ctx, a.client, pluginabi.MethodManagementHandle, req) } func httpResponseFromPlugin(resp pluginapi.ExecutorHTTPResponse, req *http.Request) *http.Response { status := resp.StatusCode if status == 0 { status = http.StatusOK } return &http.Response{ StatusCode: status, Status: fmt.Sprintf("%d %s", status, http.StatusText(status)), Header: cloneHeader(resp.Headers), Body: io.NopCloser(bytes.NewReader(bytes.Clone(resp.Body))), Request: req, } }