mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-06-11 08:44:00 +08:00
- Implemented command-line flag registration and execution for plugins with priority-based conflict resolution. - Enabled plugin-owned command-line flag execution and persistence of plugin-auth data. - Added new `Host` methods to support command-line capabilities, including flag normalization, validation, and execution state management. - Introduced unit tests to ensure coverage for command-line plugin functionality, including auth data persistence. - Updated configs to normalize plugins during initialization.
173 lines
4.6 KiB
Go
173 lines
4.6 KiB
Go
package pluginhost
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
|
|
"github.com/router-for-me/CLIProxyAPI/v7/internal/config"
|
|
"github.com/router-for-me/CLIProxyAPI/v7/internal/runtime/executor/helps"
|
|
coreauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth"
|
|
"github.com/router-for-me/CLIProxyAPI/v7/sdk/pluginapi"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type hostHTTPClient struct {
|
|
host *Host
|
|
auth *coreauth.Auth
|
|
provider string
|
|
}
|
|
|
|
func (h *Host) newHTTPClient(auth *coreauth.Auth, providers ...string) pluginapi.HostHTTPClient {
|
|
provider := ""
|
|
if len(providers) > 0 {
|
|
provider = providers[0]
|
|
}
|
|
return &hostHTTPClient{host: h, auth: auth, provider: provider}
|
|
}
|
|
|
|
func (c *hostHTTPClient) Do(ctx context.Context, req pluginapi.HTTPRequest) (pluginapi.HTTPResponse, error) {
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
resp, cfg, errDo := c.doHTTP(ctx, req)
|
|
if errDo != nil {
|
|
return pluginapi.HTTPResponse{}, errDo
|
|
}
|
|
defer func() {
|
|
if errClose := resp.Body.Close(); errClose != nil {
|
|
log.Warnf("pluginhost: response body close error: %v", errClose)
|
|
}
|
|
}()
|
|
helps.RecordAPIResponseMetadata(ctx, cfg, resp.StatusCode, resp.Header.Clone())
|
|
body, errReadAll := io.ReadAll(resp.Body)
|
|
if len(body) > 0 {
|
|
helps.AppendAPIResponseChunk(ctx, cfg, body)
|
|
}
|
|
if errReadAll != nil {
|
|
helps.RecordAPIResponseError(ctx, cfg, errReadAll)
|
|
return pluginapi.HTTPResponse{}, fmt.Errorf("read host http response: %w", errReadAll)
|
|
}
|
|
return pluginapi.HTTPResponse{
|
|
StatusCode: resp.StatusCode,
|
|
Headers: cloneHeader(resp.Header),
|
|
Body: body,
|
|
}, nil
|
|
}
|
|
|
|
func (c *hostHTTPClient) DoStream(ctx context.Context, req pluginapi.HTTPRequest) (pluginapi.HTTPStreamResponse, error) {
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
resp, cfg, errDo := c.doHTTP(ctx, req)
|
|
if errDo != nil {
|
|
return pluginapi.HTTPStreamResponse{}, errDo
|
|
}
|
|
helps.RecordAPIResponseMetadata(ctx, cfg, resp.StatusCode, resp.Header.Clone())
|
|
chunks := make(chan pluginapi.HTTPStreamChunk)
|
|
go func() {
|
|
defer close(chunks)
|
|
defer func() {
|
|
if errClose := resp.Body.Close(); errClose != nil {
|
|
log.Warnf("pluginhost: stream response body close error: %v", errClose)
|
|
}
|
|
}()
|
|
buf := make([]byte, 32*1024)
|
|
for {
|
|
n, errRead := resp.Body.Read(buf)
|
|
if n > 0 {
|
|
payload := bytes.Clone(buf[:n])
|
|
helps.AppendAPIResponseChunk(ctx, cfg, payload)
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case chunks <- pluginapi.HTTPStreamChunk{Payload: payload}:
|
|
}
|
|
}
|
|
if errRead != nil {
|
|
if errRead != io.EOF {
|
|
helps.RecordAPIResponseError(ctx, cfg, errRead)
|
|
select {
|
|
case <-ctx.Done():
|
|
case chunks <- pluginapi.HTTPStreamChunk{Err: errRead}:
|
|
}
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return pluginapi.HTTPStreamResponse{
|
|
StatusCode: resp.StatusCode,
|
|
Headers: cloneHeader(resp.Header),
|
|
Chunks: chunks,
|
|
}, nil
|
|
}
|
|
|
|
func (c *hostHTTPClient) doHTTP(ctx context.Context, req pluginapi.HTTPRequest) (*http.Response, *config.Config, error) {
|
|
if c == nil || c.host == nil {
|
|
return nil, nil, fmt.Errorf("host http client is unavailable")
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
cfg := c.host.currentRuntimeConfig()
|
|
method := req.Method
|
|
if method == "" {
|
|
method = http.MethodGet
|
|
}
|
|
httpReq, errNewRequest := http.NewRequestWithContext(ctx, method, req.URL, bytes.NewReader(bytes.Clone(req.Body)))
|
|
if errNewRequest != nil {
|
|
return nil, cfg, fmt.Errorf("create host http request: %w", errNewRequest)
|
|
}
|
|
httpReq.Header = cloneHeader(req.Headers)
|
|
c.recordHTTPRequest(ctx, cfg, httpReq, req.Body)
|
|
client := helps.NewProxyAwareHTTPClient(ctx, cfg, c.auth, 0)
|
|
if client == nil {
|
|
client = &http.Client{}
|
|
}
|
|
resp, errDo := client.Do(httpReq)
|
|
if errDo != nil {
|
|
helps.RecordAPIResponseError(ctx, cfg, errDo)
|
|
return nil, cfg, fmt.Errorf("execute host http request: %w", errDo)
|
|
}
|
|
return resp, cfg, nil
|
|
}
|
|
|
|
func (c *hostHTTPClient) recordHTTPRequest(ctx context.Context, cfg *config.Config, req *http.Request, body []byte) {
|
|
if req == nil {
|
|
return
|
|
}
|
|
provider := c.provider
|
|
var authID, authLabel, authType, authValue string
|
|
if c.auth != nil {
|
|
authID = c.auth.ID
|
|
authLabel = c.auth.Label
|
|
authType, authValue = c.auth.AccountInfo()
|
|
if provider == "" {
|
|
provider = c.auth.Provider
|
|
}
|
|
}
|
|
helps.RecordAPIRequest(ctx, cfg, helps.UpstreamRequestLog{
|
|
URL: req.URL.String(),
|
|
Method: req.Method,
|
|
Headers: req.Header.Clone(),
|
|
Body: bytes.Clone(body),
|
|
Provider: provider,
|
|
AuthID: authID,
|
|
AuthLabel: authLabel,
|
|
AuthType: authType,
|
|
AuthValue: authValue,
|
|
})
|
|
}
|
|
|
|
func (h *Host) currentRuntimeConfig() *config.Config {
|
|
if h == nil {
|
|
return nil
|
|
}
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
return h.runtimeConfig
|
|
}
|