Merge pull request #3973 from router-for-me/home

feat(home): enhance plugin management and synchronization
This commit is contained in:
Luis Pater
2026-06-23 21:59:41 +08:00
committed by GitHub
14 changed files with 1057 additions and 32 deletions

View File

@@ -0,0 +1,39 @@
package main
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/router-for-me/CLIProxyAPI/v7/internal/homeplugins"
)
const homePluginStatusReportTimeout = 10 * time.Second
type homePluginStatusClient interface {
RPushPluginStatus(ctx context.Context, payload []byte) error
}
func reportHomePluginStatus(ctx context.Context, client homePluginStatusClient, nodeID string, report homeplugins.SyncReport) error {
if client == nil {
return fmt.Errorf("home plugin status client is unavailable")
}
nodeID = strings.TrimSpace(nodeID)
if nodeID == "" {
return fmt.Errorf("home plugin status node id is empty")
}
report.NodeID = nodeID
report.UpdatedAt = time.Now().UTC()
raw, errMarshal := json.Marshal(report)
if errMarshal != nil {
return errMarshal
}
if ctx == nil {
ctx = context.Background()
}
reportCtx, cancel := context.WithTimeout(ctx, homePluginStatusReportTimeout)
defer cancel()
return client.RPushPluginStatus(reportCtx, raw)
}

View File

@@ -0,0 +1,93 @@
package main
import (
"context"
"encoding/json"
"errors"
"strings"
"testing"
"github.com/router-for-me/CLIProxyAPI/v7/internal/homeplugins"
)
type recordingHomePluginStatusClient struct {
payload []byte
err error
}
func (c *recordingHomePluginStatusClient) RPushPluginStatus(ctx context.Context, payload []byte) error {
c.payload = append([]byte(nil), payload...)
return c.err
}
func TestReportHomePluginStatusPushesNodeReport(t *testing.T) {
client := &recordingHomePluginStatusClient{}
report := homeplugins.SyncReport{
Task: "plugin-sync",
Status: "success",
OK: true,
Plugins: []homeplugins.PluginInstallStatus{{ID: "sample", InstallStatus: "installed"}},
}
if errReport := reportHomePluginStatus(context.Background(), client, " node-1 ", report); errReport != nil {
t.Fatalf("reportHomePluginStatus() error = %v", errReport)
}
var payload homeplugins.SyncReport
if errUnmarshal := json.Unmarshal(client.payload, &payload); errUnmarshal != nil {
t.Fatalf("unmarshal payload: %v", errUnmarshal)
}
if payload.NodeID != "node-1" || !payload.OK || len(payload.Plugins) != 1 {
t.Fatalf("payload = %+v, want node report", payload)
}
if payload.UpdatedAt.IsZero() {
t.Fatal("payload UpdatedAt is zero")
}
}
func TestReportHomePluginStatusPushesEmptyReport(t *testing.T) {
client := &recordingHomePluginStatusClient{}
report := homeplugins.SyncReport{
Task: "plugin-sync",
Status: "success",
OK: true,
Plugins: []homeplugins.PluginInstallStatus{},
}
if errReport := reportHomePluginStatus(context.Background(), client, "node-1", report); errReport != nil {
t.Fatalf("reportHomePluginStatus() error = %v", errReport)
}
var payload homeplugins.SyncReport
if errUnmarshal := json.Unmarshal(client.payload, &payload); errUnmarshal != nil {
t.Fatalf("unmarshal payload: %v", errUnmarshal)
}
if payload.NodeID != "node-1" || len(payload.Plugins) != 0 {
t.Fatalf("payload = %+v, want empty node report", payload)
}
}
func TestReportHomePluginStatusRequiresNodeID(t *testing.T) {
client := &recordingHomePluginStatusClient{}
report := homeplugins.SyncReport{
Plugins: []homeplugins.PluginInstallStatus{{ID: "sample", InstallStatus: "failed"}},
}
errReport := reportHomePluginStatus(context.Background(), client, " ", report)
if errReport == nil || !strings.Contains(errReport.Error(), "node id") {
t.Fatalf("reportHomePluginStatus() error = %v, want node id error", errReport)
}
if len(client.payload) != 0 {
t.Fatalf("client payload = %s, want none", client.payload)
}
}
func TestReportHomePluginStatusPropagatesPushError(t *testing.T) {
client := &recordingHomePluginStatusClient{err: errors.New("push failed")}
report := homeplugins.SyncReport{
Plugins: []homeplugins.PluginInstallStatus{{ID: "sample", InstallStatus: "installed"}},
}
errReport := reportHomePluginStatus(context.Background(), client, "node-1", report)
if !errors.Is(errReport, client.err) {
t.Fatalf("reportHomePluginStatus() error = %v, want push failed", errReport)
}
}

View File

@@ -149,6 +149,8 @@ func main() {
var cfg *config.Config
var isCloudDeploy bool
var configLoadedFromHome bool
var homeClient *home.Client
var homePluginSyncReport homeplugins.SyncReport
var (
usePostgresStore bool
pgStoreDSN string
@@ -278,7 +280,7 @@ func main() {
if homeDisableClusterDiscovery {
homeCfg.DisableClusterDiscovery = true
}
homeClient := home.New(homeCfg)
homeClient = home.New(homeCfg)
defer homeClient.Close()
ctxHomeConfig, cancelHomeConfig := context.WithTimeout(context.Background(), 30*time.Second)
@@ -301,10 +303,17 @@ func main() {
parsed.Port = 8317 // Default to 8317 for home mode, can be overridden by home config
parsed.UsageStatisticsEnabled = true
ctxHomePlugins, cancelHomePlugins := context.WithTimeout(context.Background(), 30*time.Second)
errHomePlugins := homeplugins.Sync(ctxHomePlugins, parsed, pluginHost)
var errHomePlugins error
homePluginSyncReport, errHomePlugins = homeplugins.SyncWithReport(ctxHomePlugins, parsed, pluginHost)
cancelHomePlugins()
errReportPlugins := reportHomePluginStatus(context.Background(), homeClient, homeCfg.NodeID, homePluginSyncReport)
if errHomePlugins != nil {
log.Errorf("failed to fetch plugins from home: %v", errHomePlugins)
}
if errReportPlugins != nil {
log.Warnf("failed to report home plugin sync status: %v", errReportPlugins)
}
if errHomePlugins != nil {
return
}
cfg = parsed
@@ -559,6 +568,19 @@ func main() {
// Register built-in access providers before constructing services.
configaccess.Register(&cfg.SDKConfig)
pluginHost.ApplyConfig(context.Background(), cfg)
if configLoadedFromHome {
errHomePluginLoad := homeplugins.MarkLoadResults(&homePluginSyncReport, pluginHost)
errReportPlugins := reportHomePluginStatus(context.Background(), homeClient, cfg.Home.NodeID, homePluginSyncReport)
if errHomePluginLoad != nil {
log.Errorf("failed to load home plugins: %v", errHomePluginLoad)
}
if errReportPlugins != nil {
log.Warnf("failed to report home plugin load status: %v", errReportPlugins)
}
if errHomePluginLoad != nil {
return
}
}
if pluginHost.HasTriggeredCommandLineFlags() {
if exitCode, handled := pluginHost.ExecuteCommandLine(context.Background(), os.Args[0], os.Args[1:], configFilePath, flag.CommandLine); handled {
if exitCode != 0 {

View File

@@ -3,6 +3,7 @@ package config
// HomeConfig stores runtime-only Home control plane settings from -home-jwt.
type HomeConfig struct {
Enabled bool `yaml:"enabled" json:"enabled"`
NodeID string `yaml:"-" json:"-"`
Host string `yaml:"host" json:"-"`
Port int `yaml:"port" json:"-"`
DisableClusterDiscovery bool `yaml:"disable-cluster-discovery" json:"-"`

View File

@@ -65,6 +65,7 @@ func ConfigFromJWT(ctx context.Context, rawJWT string) (config.HomeConfig, error
}
return config.HomeConfig{
Enabled: true,
NodeID: strings.TrimSpace(claims.CertificateID),
Host: strings.TrimSpace(claims.IP),
Port: claims.Port,
TLS: config.HomeTLSConfig{

View File

@@ -24,11 +24,13 @@ import (
)
const (
redisKeyConfig = "config"
redisChannelConfig = "config"
redisKeyUsage = "usage"
redisKeyRequestLog = "request-log"
redisKeyAppLog = "app-log"
redisKeyConfig = "config"
redisChannelConfig = "config"
redisKeyUsage = "usage"
redisKeyRequestLog = "request-log"
redisKeyAppLog = "app-log"
redisKeyPluginStatus = "plugin-status"
redisKeyPluginTasks = "plugin-tasks"
homeReconnectInterval = time.Second
homeReconnectFailoverThreshold = 3
@@ -59,6 +61,16 @@ type clusterNodesEnvelope struct {
Nodes []clusterNode `json:"nodes"`
}
type PluginTask struct {
ID uint `json:"id"`
Operation string `json:"operation"`
PluginID string `json:"plugin_id"`
TargetNodeType string `json:"target_node_type,omitempty"`
TargetNodeID string `json:"target_node_id,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type KVSetOptions struct {
EX time.Duration
PX time.Duration
@@ -885,7 +897,40 @@ func (c *Client) RPushAppLog(ctx context.Context, payload []byte) error {
return cmd.RPush(ctx, redisKeyAppLog, payload).Err()
}
func (c *Client) handleSubscriptionPayload(channel string, payload string, onConfig func([]byte) error) error {
func (c *Client) RPushPluginStatus(ctx context.Context, payload []byte) error {
cmd, errClient := c.commandClient()
if errClient != nil {
return errClient
}
if len(payload) == 0 {
return nil
}
return cmd.RPush(ctx, redisKeyPluginStatus, payload).Err()
}
func (c *Client) GetPluginTasks(ctx context.Context) ([]PluginTask, error) {
cmd, errClient := c.commandClient()
if errClient != nil {
return nil, errClient
}
raw, errGet := cmd.Get(ctx, redisKeyPluginTasks).Bytes()
if errors.Is(errGet, redis.Nil) {
return nil, nil
}
if errGet != nil {
return nil, errGet
}
if len(raw) == 0 {
return nil, nil
}
var tasks []PluginTask
if errUnmarshal := json.Unmarshal(raw, &tasks); errUnmarshal != nil {
return nil, errUnmarshal
}
return tasks, nil
}
func (c *Client) handleSubscriptionPayload(ctx context.Context, channel string, payload string, onConfig func([]byte) error) error {
payload = strings.TrimSpace(payload)
if payload == "" {
return nil
@@ -1004,7 +1049,7 @@ func (c *Client) StartConfigSubscriber(ctx context.Context, onConfig func([]byte
if msg == nil {
continue
}
if errApply := c.handleSubscriptionPayload(msg.Channel, msg.Payload, onConfig); errApply != nil {
if errApply := c.handleSubscriptionPayload(ctx, msg.Channel, msg.Payload, onConfig); errApply != nil {
if strings.EqualFold(strings.TrimSpace(msg.Channel), redisChannelCluster) {
log.Warn("failed to apply cluster update from home control center, ignoring")
} else {

View File

@@ -273,6 +273,47 @@ func TestKVMSetUsesStableKeyOrder(t *testing.T) {
}
}
func TestRPushPluginStatusUsesPluginStatusKey(t *testing.T) {
client, commands := newRedisCommandTestClient(t, func(args []string) string {
if len(args) > 0 && strings.EqualFold(args[0], "RPUSH") {
return ":1\r\n"
}
return "-ERR unexpected command\r\n"
})
if errPush := client.RPushPluginStatus(context.Background(), []byte(`{"ok":true}`)); errPush != nil {
t.Fatalf("RPushPluginStatus() error = %v", errPush)
}
got := commands.Last()
want := []string{"rpush", "plugin-status", `{"ok":true}`}
if !reflect.DeepEqual(got, want) {
t.Fatalf("RPUSH command = %#v, want %#v", got, want)
}
}
func TestGetPluginTasksUsesPluginTasksKey(t *testing.T) {
client, commands := newRedisCommandTestClient(t, func(args []string) string {
if len(args) > 0 && strings.EqualFold(args[0], "GET") {
payload := `[{"id":7,"operation":"delete","plugin_id":"sample"}]`
return fmt.Sprintf("$%d\r\n%s\r\n", len(payload), payload)
}
return "-ERR unexpected command\r\n"
})
tasks, errTasks := client.GetPluginTasks(context.Background())
if errTasks != nil {
t.Fatalf("GetPluginTasks() error = %v", errTasks)
}
if len(tasks) != 1 || tasks[0].ID != 7 || tasks[0].Operation != "delete" || tasks[0].PluginID != "sample" {
t.Fatalf("tasks = %+v, want one delete task", tasks)
}
got := commands.Last()
want := []string{"get", "plugin-tasks"}
if !reflect.DeepEqual(got, want) {
t.Fatalf("GET command = %#v, want %#v", got, want)
}
}
type redisCommandLog struct {
mu sync.Mutex
commands [][]string

View File

@@ -2,10 +2,15 @@ package homeplugins
import (
"context"
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"time"
"github.com/router-for-me/CLIProxyAPI/v7/internal/config"
"github.com/router-for-me/CLIProxyAPI/v7/internal/util"
@@ -26,6 +31,57 @@ type PluginRuntime interface {
UnloadPlugin(id string) bool
}
type PluginLoadInspector interface {
PluginRegistered(id string) bool
}
type SyncReport struct {
SchemaVersion int `json:"schema_version"`
TaskID uint `json:"task_id,omitempty"`
Task string `json:"task"`
NodeID string `json:"node_id,omitempty"`
Status string `json:"status"`
Phase string `json:"phase"`
OK bool `json:"ok"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
Platform Platform `json:"platform"`
Plugins []PluginInstallStatus `json:"plugins"`
Error string `json:"error,omitempty"`
}
type PluginInstallStatus struct {
ID string `json:"id"`
Version string `json:"version,omitempty"`
ReleaseTag string `json:"release_tag,omitempty"`
Repository string `json:"repository,omitempty"`
InstallStatus string `json:"install_status"`
LoadStatus string `json:"load_status,omitempty"`
Path string `json:"path,omitempty"`
Skipped bool `json:"skipped,omitempty"`
Overwritten bool `json:"overwritten,omitempty"`
Error string `json:"error,omitempty"`
}
const (
pluginTaskName = "plugin-sync"
pluginDeleteTaskName = "plugin-delete"
pluginTaskStatusOK = "success"
pluginTaskStatusError = "failed"
pluginTaskPhaseInstall = "install"
pluginTaskPhaseLoad = "load"
pluginTaskPhaseDelete = "delete"
pluginInstallStatusInstalled = "installed"
pluginInstallStatusSkipped = "skipped"
pluginInstallStatusFailed = "failed"
pluginInstallStatusDeleted = "deleted"
pluginInstallStatusMissing = "missing"
pluginLoadStatusLoaded = "loaded"
pluginLoadStatusFailed = "failed"
)
// CurrentPlatform reports the platform used by pluginhost discovery.
func CurrentPlatform() Platform {
return Platform{
@@ -53,52 +109,99 @@ func NormalizePlatform(platform Platform) Platform {
}
func Sync(ctx context.Context, cfg *config.Config, pluginRuntime PluginRuntime) error {
return SyncPlatform(ctx, cfg, pluginRuntime, CurrentPlatform())
_, errSync := SyncPlatformWithReport(ctx, cfg, pluginRuntime, CurrentPlatform())
return errSync
}
func SyncPlatform(ctx context.Context, cfg *config.Config, pluginRuntime PluginRuntime, platform Platform) error {
_, errSync := SyncPlatformWithReport(ctx, cfg, pluginRuntime, platform)
return errSync
}
func SyncWithReport(ctx context.Context, cfg *config.Config, pluginRuntime PluginRuntime) (SyncReport, error) {
return SyncPlatformWithReport(ctx, cfg, pluginRuntime, CurrentPlatform())
}
func SyncPlatformWithReport(ctx context.Context, cfg *config.Config, pluginRuntime PluginRuntime, platform Platform) (SyncReport, error) {
if cfg == nil || !cfg.Home.Enabled || !cfg.Plugins.Enabled {
return nil
return newSyncReport(platform), nil
}
platform = NormalizePlatform(platform)
report := newSyncReport(platform)
if platform.GOOS == "" {
return fmt.Errorf("home plugins: goos is required")
errPlatform := fmt.Errorf("home plugins: goos is required")
finishReport(&report, errPlatform)
return report, errPlatform
}
if platform.GOARCH == "" {
return fmt.Errorf("home plugins: goarch is required")
errPlatform := fmt.Errorf("home plugins: goarch is required")
finishReport(&report, errPlatform)
return report, errPlatform
}
report.Platform = platform
root := strings.TrimSpace(cfg.Plugins.Dir)
if root == "" {
root = "plugins"
}
client := newPluginStoreClient(cfg)
for id, item := range cfg.Plugins.Configs {
var syncErrors []error
ids := make([]string, 0, len(cfg.Plugins.Configs))
for id := range cfg.Plugins.Configs {
ids = append(ids, id)
}
sort.Strings(ids)
for _, id := range ids {
item := cfg.Plugins.Configs[id]
if !pluginConfigEnabled(item) {
continue
}
manifest, okManifest, errManifest := storeManifestFromPluginConfig(id, item)
if errManifest != nil {
return errManifest
status := PluginInstallStatus{
ID: strings.TrimSpace(id),
InstallStatus: pluginInstallStatusFailed,
Error: errManifest.Error(),
}
report.Plugins = append(report.Plugins, status)
syncErrors = append(syncErrors, errManifest)
continue
}
if !okManifest {
continue
}
if errSync := installManifest(ctx, client, manifest, root, platform, pluginRuntime); errSync != nil {
return errSync
status := pluginStatusFromManifest(manifest)
result, errSync := installManifest(ctx, client, manifest, root, platform, pluginRuntime)
if errSync != nil {
status.InstallStatus = pluginInstallStatusFailed
status.Error = errSync.Error()
report.Plugins = append(report.Plugins, status)
syncErrors = append(syncErrors, errSync)
continue
}
status.Path = strings.TrimSpace(result.Path)
status.Skipped = result.Skipped
status.Overwritten = result.Overwritten
if result.Skipped {
status.InstallStatus = pluginInstallStatusSkipped
} else {
status.InstallStatus = pluginInstallStatusInstalled
}
report.Plugins = append(report.Plugins, status)
}
return nil
errSync := errors.Join(syncErrors...)
finishReport(&report, errSync)
return report, errSync
}
func installManifest(ctx context.Context, client sdkpluginstore.Client, manifest sdkpluginstore.Manifest, root string, platform Platform, pluginRuntime PluginRuntime) error {
func installManifest(ctx context.Context, client sdkpluginstore.Client, manifest sdkpluginstore.Manifest, root string, platform Platform, pluginRuntime PluginRuntime) (sdkpluginstore.InstallResult, error) {
id := strings.TrimSpace(manifest.ID)
if id == "" {
return fmt.Errorf("home plugins: manifest plugin id is empty")
return sdkpluginstore.InstallResult{}, fmt.Errorf("home plugins: manifest plugin id is empty")
}
pluginIsBusy := func() bool {
return pluginRuntime != nil && pluginRuntime.PluginBusy(id)
}
_, errInstall := client.InstallManifest(ctx, manifest, sdkpluginstore.InstallOptions{
result, errInstall := client.InstallManifest(ctx, manifest, sdkpluginstore.InstallOptions{
PluginsDir: root,
GOOS: platform.GOOS,
GOARCH: platform.GOARCH,
@@ -114,9 +217,232 @@ func installManifest(ctx context.Context, client sdkpluginstore.Client, manifest
},
})
if errInstall != nil {
return fmt.Errorf("home plugins: install %s: %w", id, errInstall)
return sdkpluginstore.InstallResult{}, fmt.Errorf("home plugins: install %s: %w", id, errInstall)
}
return result, nil
}
func DeleteWithReport(ctx context.Context, cfg *config.Config, pluginRuntime PluginRuntime, taskID uint, pluginID string) SyncReport {
_ = ctx
platform := CurrentPlatform()
report := newSyncReport(platform)
report.TaskID = taskID
report.Task = pluginDeleteTaskName
report.Phase = pluginTaskPhaseDelete
pluginID = strings.TrimSpace(pluginID)
status := PluginInstallStatus{ID: pluginID}
if cfg == nil {
status.InstallStatus = pluginInstallStatusFailed
status.Error = "home plugins: config is nil"
report.Plugins = append(report.Plugins, status)
finishReport(&report, errors.New(status.Error))
return report
}
root := strings.TrimSpace(cfg.Plugins.Dir)
if root == "" {
root = "plugins"
}
path, deleted, errDelete := deletePluginArtifact(root, pluginID, pluginRuntime)
status.Path = strings.TrimSpace(path)
switch {
case errDelete != nil:
status.InstallStatus = pluginInstallStatusFailed
status.Error = errDelete.Error()
case deleted:
status.InstallStatus = pluginInstallStatusDeleted
default:
status.InstallStatus = pluginInstallStatusMissing
}
report.Plugins = append(report.Plugins, status)
finishReport(&report, errDelete)
return report
}
func deletePluginArtifact(root string, id string, pluginRuntime PluginRuntime) (string, bool, error) {
id = strings.TrimSpace(id)
if !validPluginFileID(id) {
return "", false, fmt.Errorf("invalid plugin id %q", id)
}
path, errPath := currentPluginFilePath(root, id)
if errPath != nil {
return "", false, errPath
}
if path == "" {
return "", false, nil
}
if pluginRuntime != nil && pluginRuntime.PluginBusy(id) {
if !pluginRuntime.UnloadPlugin(id) && pluginRuntime.PluginBusy(id) {
return path, false, sdkpluginstore.ErrLoadedPluginLocked
}
}
if errRemove := os.Remove(path); errRemove != nil {
if errors.Is(errRemove, os.ErrNotExist) {
return path, false, nil
}
return path, false, errRemove
}
return path, true, nil
}
func currentPluginFilePath(root string, id string) (string, error) {
root = strings.TrimSpace(root)
if root == "" {
root = "plugins"
}
platform := CurrentPlatform()
extension := pluginExtension(platform.GOOS)
for _, dir := range pluginCandidateDirs(root, platform.GOOS, platform.GOARCH, platform.Variant) {
entries, errReadDir := os.ReadDir(dir)
if errReadDir != nil {
if errors.Is(errReadDir, os.ErrNotExist) {
continue
}
return "", errReadDir
}
files := make([]string, 0, len(entries))
for _, entry := range entries {
if entry == nil || !entry.Type().IsRegular() {
continue
}
if strings.HasSuffix(strings.ToLower(entry.Name()), extension) {
files = append(files, filepath.Join(dir, entry.Name()))
}
}
sort.Strings(files)
for _, filePath := range files {
if pluginIDFromPath(filePath) == id {
return filePath, nil
}
}
}
return "", nil
}
func pluginCandidateDirs(root string, goos string, goarch string, variant string) []string {
dirs := make([]string, 0, 3)
if variant != "" {
dirs = append(dirs, filepath.Join(root, goos, goarch+"-"+variant))
}
dirs = append(dirs, filepath.Join(root, goos, goarch))
dirs = append(dirs, root)
return dirs
}
func pluginIDFromPath(path string) string {
base := filepath.Base(path)
lowerBase := strings.ToLower(base)
for _, extension := range []string{".so", ".dylib", ".dll"} {
if strings.HasSuffix(lowerBase, extension) {
return base[:len(base)-len(extension)]
}
}
return base
}
func pluginExtension(goos string) string {
switch strings.ToLower(strings.TrimSpace(goos)) {
case "darwin", "mac", "macos", "osx":
return ".dylib"
case "windows":
return ".dll"
default:
return ".so"
}
}
func validPluginFileID(id string) bool {
id = strings.TrimSpace(id)
if id == "" || id == "." || id == ".." || strings.ContainsAny(id, `/\`) {
return false
}
for _, char := range id {
switch {
case char >= 'a' && char <= 'z':
case char >= 'A' && char <= 'Z':
case char >= '0' && char <= '9':
case char == '-', char == '_', char == '.':
default:
return false
}
}
return true
}
func MarkLoadResults(report *SyncReport, inspector PluginLoadInspector) error {
if report == nil {
return nil
}
report.Phase = pluginTaskPhaseLoad
var loadErrors []error
for index := range report.Plugins {
status := &report.Plugins[index]
if status.InstallStatus == pluginInstallStatusFailed {
if status.LoadStatus == "" {
status.LoadStatus = pluginInstallStatusSkipped
}
if strings.TrimSpace(status.Error) != "" {
loadErrors = append(loadErrors, errors.New(status.Error))
} else {
loadErrors = append(loadErrors, fmt.Errorf("home plugins: plugin %s install failed", status.ID))
}
continue
}
if inspector != nil && inspector.PluginRegistered(status.ID) {
status.LoadStatus = pluginLoadStatusLoaded
continue
}
status.LoadStatus = pluginLoadStatusFailed
errLoad := fmt.Errorf("home plugins: plugin %s installed but not loaded", status.ID)
if strings.TrimSpace(status.Error) == "" {
status.Error = errLoad.Error()
}
loadErrors = append(loadErrors, errLoad)
}
errLoad := errors.Join(loadErrors...)
finishReport(report, errLoad)
return errLoad
}
func newSyncReport(platform Platform) SyncReport {
now := time.Now().UTC()
return SyncReport{
SchemaVersion: 1,
Task: pluginTaskName,
Status: pluginTaskStatusOK,
Phase: pluginTaskPhaseInstall,
OK: true,
StartedAt: now,
UpdatedAt: now,
Platform: NormalizePlatform(platform),
Plugins: []PluginInstallStatus{},
}
}
func finishReport(report *SyncReport, errTask error) {
if report == nil {
return
}
now := time.Now().UTC()
report.FinishedAt = now
report.UpdatedAt = now
report.OK = errTask == nil
if errTask != nil {
report.Status = pluginTaskStatusError
report.Error = errTask.Error()
return
}
report.Status = pluginTaskStatusOK
report.Error = ""
}
func pluginStatusFromManifest(manifest sdkpluginstore.Manifest) PluginInstallStatus {
return PluginInstallStatus{
ID: strings.TrimSpace(manifest.ID),
Version: strings.TrimSpace(manifest.Version),
ReleaseTag: strings.TrimSpace(manifest.ReleaseTag),
Repository: strings.TrimSpace(manifest.Repository),
InstallStatus: pluginInstallStatusFailed,
}
return nil
}
func storeManifestFromPluginConfig(id string, item config.PluginInstanceConfig) (sdkpluginstore.Manifest, bool, error) {

View File

@@ -10,6 +10,7 @@ import (
"net/http"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
@@ -23,6 +24,8 @@ type fakePluginRuntime struct {
unloaded []string
}
type fakePluginLoadInspector map[string]bool
func (r *fakePluginRuntime) PluginBusy(id string) bool {
return r.busy
}
@@ -33,6 +36,10 @@ func (r *fakePluginRuntime) UnloadPlugin(id string) bool {
return true
}
func (i fakePluginLoadInspector) PluginRegistered(id string) bool {
return i[id]
}
func TestSyncPlatformInstallsManifestArtifact(t *testing.T) {
root := t.TempDir()
archiveData := makeZip(t, map[string]string{"sample.dll": "library-data"})
@@ -65,6 +72,87 @@ func TestSyncPlatformInstallsManifestArtifact(t *testing.T) {
}
}
func TestSyncPlatformWithReportRecordsSuccessfulInstall(t *testing.T) {
root := t.TempDir()
archiveData := makeZip(t, map[string]string{"sample.dll": "library-data"})
archiveName := "sample_0.2.0_windows_amd64.zip"
checksum := sha256.Sum256(archiveData)
httpClient := mapHTTPDoer{
"https://api.github.com/repos/owner/sample-plugin/releases/tags/v0.2.0": []byte(`{
"tag_name": "v0.2.0",
"assets": [
{"name": "` + archiveName + `", "browser_download_url": "https://downloads.example/` + archiveName + `"},
{"name": "checksums.txt", "browser_download_url": "https://downloads.example/checksums.txt"}
]
}`),
"https://downloads.example/" + archiveName: archiveData,
"https://downloads.example/checksums.txt": []byte(hex.EncodeToString(checksum[:]) + " " + archiveName + "\n"),
}
restore := replacePluginStoreClientForTest(httpClient)
defer restore()
report, errSync := SyncPlatformWithReport(context.Background(), syncTestConfig(t, root), nil, Platform{GOOS: "windows", GOARCH: "amd64"})
if errSync != nil {
t.Fatalf("SyncPlatformWithReport() error = %v", errSync)
}
if !report.OK || report.Status != pluginTaskStatusOK || report.Phase != pluginTaskPhaseInstall {
t.Fatalf("report status = %+v, want successful install phase", report)
}
if len(report.Plugins) != 1 {
t.Fatalf("report plugins len = %d, want 1", len(report.Plugins))
}
plugin := report.Plugins[0]
if plugin.ID != "sample" || plugin.InstallStatus != pluginInstallStatusInstalled || plugin.Version != "0.2.0" {
t.Fatalf("plugin report = %+v, want installed sample 0.2.0", plugin)
}
if wantPath := filepath.Join(root, "windows", "amd64", "sample.dll"); plugin.Path != wantPath {
t.Fatalf("plugin path = %q, want %q", plugin.Path, wantPath)
}
}
func TestSyncPlatformWithReportRecordsSkippedIdenticalArtifact(t *testing.T) {
root := t.TempDir()
targetDir := filepath.Join(root, "windows", "amd64")
if errMkdir := os.MkdirAll(targetDir, 0o755); errMkdir != nil {
t.Fatalf("MkdirAll() error = %v", errMkdir)
}
target := filepath.Join(targetDir, "sample.dll")
if errWrite := os.WriteFile(target, []byte("library-data"), 0o644); errWrite != nil {
t.Fatalf("WriteFile() error = %v", errWrite)
}
archiveData := makeZip(t, map[string]string{"sample.dll": "library-data"})
archiveName := "sample_0.2.0_windows_amd64.zip"
checksum := sha256.Sum256(archiveData)
httpClient := mapHTTPDoer{
"https://api.github.com/repos/owner/sample-plugin/releases/tags/v0.2.0": []byte(`{
"tag_name": "v0.2.0",
"assets": [
{"name": "` + archiveName + `", "browser_download_url": "https://downloads.example/` + archiveName + `"},
{"name": "checksums.txt", "browser_download_url": "https://downloads.example/checksums.txt"}
]
}`),
"https://downloads.example/" + archiveName: archiveData,
"https://downloads.example/checksums.txt": []byte(hex.EncodeToString(checksum[:]) + " " + archiveName + "\n"),
}
restore := replacePluginStoreClientForTest(httpClient)
defer restore()
report, errSync := SyncPlatformWithReport(context.Background(), syncTestConfig(t, root), nil, Platform{GOOS: "windows", GOARCH: "amd64"})
if errSync != nil {
t.Fatalf("SyncPlatformWithReport() error = %v", errSync)
}
if !report.OK || len(report.Plugins) != 1 {
t.Fatalf("report = %+v, want one successful skipped plugin", report)
}
plugin := report.Plugins[0]
if plugin.ID != "sample" || plugin.InstallStatus != pluginInstallStatusSkipped || !plugin.Skipped {
t.Fatalf("plugin report = %+v, want skipped identical sample", plugin)
}
if plugin.Path != target {
t.Fatalf("plugin path = %q, want %q", plugin.Path, target)
}
}
func TestSyncPlatformSkipsIdenticalBusyPlugin(t *testing.T) {
root := t.TempDir()
targetDir := filepath.Join(root, "windows", "amd64")
@@ -147,6 +235,110 @@ store:
}
}
func TestSyncPlatformWithReportRecordsInvalidManifest(t *testing.T) {
cfg := &config.Config{
Home: config.HomeConfig{Enabled: true},
Plugins: config.PluginsConfig{
Enabled: true,
Dir: t.TempDir(),
Configs: map[string]config.PluginInstanceConfig{
"sample": pluginConfigFromYAML(t, `
enabled: true
store:
id: sample
`),
},
},
}
report, errSync := SyncPlatformWithReport(context.Background(), cfg, nil, Platform{GOOS: "linux", GOARCH: "amd64"})
if errSync == nil {
t.Fatal("SyncPlatformWithReport() error = nil, want invalid manifest")
}
if report.OK || report.Status != pluginTaskStatusError || len(report.Plugins) != 1 {
t.Fatalf("report = %+v, want one failed plugin", report)
}
if report.Plugins[0].ID != "sample" || report.Plugins[0].InstallStatus != pluginInstallStatusFailed || !strings.Contains(report.Plugins[0].Error, "invalid store manifest") {
t.Fatalf("plugin report = %+v, want invalid manifest failure", report.Plugins[0])
}
}
func TestMarkLoadResultsFailsWhenInstalledPluginDidNotLoad(t *testing.T) {
report := SyncReport{
Status: pluginTaskStatusOK,
OK: true,
Phase: pluginTaskPhaseInstall,
Plugins: []PluginInstallStatus{{ID: "sample", InstallStatus: pluginInstallStatusInstalled}},
}
errLoad := MarkLoadResults(&report, fakePluginLoadInspector{})
if errLoad == nil {
t.Fatal("MarkLoadResults() error = nil, want load failure")
}
if report.OK || report.Status != pluginTaskStatusError || report.Phase != pluginTaskPhaseLoad {
t.Fatalf("report = %+v, want failed load phase", report)
}
if report.Plugins[0].LoadStatus != pluginLoadStatusFailed || !strings.Contains(report.Plugins[0].Error, "installed but not loaded") {
t.Fatalf("plugin report = %+v, want load failure", report.Plugins[0])
}
}
func TestMarkLoadResultsPreservesInstallFailure(t *testing.T) {
report := SyncReport{
Status: pluginTaskStatusError,
OK: false,
Phase: pluginTaskPhaseInstall,
Plugins: []PluginInstallStatus{{ID: "sample", InstallStatus: pluginInstallStatusFailed, Error: "install boom"}},
}
errLoad := MarkLoadResults(&report, fakePluginLoadInspector{"sample": true})
if errLoad == nil {
t.Fatal("MarkLoadResults() error = nil, want install failure to remain fatal")
}
if report.OK || report.Status != pluginTaskStatusError {
t.Fatalf("report = %+v, want failed status", report)
}
if report.Plugins[0].LoadStatus != pluginInstallStatusSkipped {
t.Fatalf("load status = %q, want skipped", report.Plugins[0].LoadStatus)
}
}
func TestDeleteWithReportRemovesCurrentPlatformPlugin(t *testing.T) {
root := t.TempDir()
targetDir := filepath.Join(root, runtime.GOOS, runtime.GOARCH)
if errMkdir := os.MkdirAll(targetDir, 0o755); errMkdir != nil {
t.Fatalf("MkdirAll() error = %v", errMkdir)
}
target := filepath.Join(targetDir, "sample"+pluginExtension(runtime.GOOS))
if errWrite := os.WriteFile(target, []byte("library-data"), 0o644); errWrite != nil {
t.Fatalf("WriteFile() error = %v", errWrite)
}
runtimeHost := &fakePluginRuntime{busy: true}
report := DeleteWithReport(context.Background(), syncTestConfig(t, root), runtimeHost, 42, "sample")
if !report.OK || report.TaskID != 42 || report.Task != pluginDeleteTaskName || report.Phase != pluginTaskPhaseDelete {
t.Fatalf("report = %+v, want successful delete task", report)
}
if len(runtimeHost.unloaded) != 1 || runtimeHost.unloaded[0] != "sample" {
t.Fatalf("UnloadPlugin calls = %v, want sample", runtimeHost.unloaded)
}
if len(report.Plugins) != 1 || report.Plugins[0].InstallStatus != pluginInstallStatusDeleted || report.Plugins[0].Path != target {
t.Fatalf("plugin report = %+v, want deleted target", report.Plugins)
}
if _, errStat := os.Stat(target); !os.IsNotExist(errStat) {
t.Fatalf("target stat error = %v, want not exist", errStat)
}
}
func TestDeleteWithReportMissingPluginIsSuccess(t *testing.T) {
report := DeleteWithReport(context.Background(), syncTestConfig(t, t.TempDir()), nil, 7, "missing")
if !report.OK || report.Status != pluginTaskStatusOK {
t.Fatalf("report = %+v, want missing plugin delete success", report)
}
if len(report.Plugins) != 1 || report.Plugins[0].InstallStatus != pluginInstallStatusMissing {
t.Fatalf("plugin report = %+v, want missing status", report.Plugins)
}
}
func syncTestConfig(t *testing.T, root string) *config.Config {
t.Helper()
return &config.Config{

View File

@@ -123,6 +123,9 @@ func TestPluginLoadedTracksLoadedPluginAfterDisabled(t *testing.T) {
if !h.PluginLoaded("alpha") {
t.Fatal("PluginLoaded(alpha) = false, want true after load")
}
if !h.PluginRegistered("alpha") {
t.Fatal("PluginRegistered(alpha) = false, want true after load")
}
if len(h.RegisteredPlugins()) != 1 {
t.Fatalf("RegisteredPlugins() len = %d, want 1", len(h.RegisteredPlugins()))
}
@@ -140,6 +143,9 @@ func TestPluginLoadedTracksLoadedPluginAfterDisabled(t *testing.T) {
if len(h.RegisteredPlugins()) != 0 {
t.Fatalf("RegisteredPlugins() len = %d, want 0 after disable", len(h.RegisteredPlugins()))
}
if h.PluginRegistered("alpha") {
t.Fatal("PluginRegistered(alpha) = true, want false after disable")
}
if !h.PluginLoaded("alpha") {
t.Fatal("PluginLoaded(alpha) = false, want true while library remains loaded")
}

View File

@@ -59,6 +59,27 @@ func (h *Host) RegisteredPlugins() []RegisteredPluginInfo {
return out
}
// PluginRegistered reports whether a plugin is active in the current runtime snapshot.
func (h *Host) PluginRegistered(id string) bool {
if h == nil {
return false
}
id = strings.TrimSpace(id)
if id == "" {
return false
}
snap := h.Snapshot()
if snap == nil || len(snap.records) == 0 {
return false
}
for _, record := range snap.records {
if record.id == id {
return true
}
}
return false
}
func (h *Host) registeredPluginMenus() map[string][]RegisteredPluginMenu {
out := make(map[string][]RegisteredPluginMenu)
if h == nil {

View File

@@ -2,14 +2,132 @@ package cliproxy
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"sort"
"strings"
"time"
"github.com/router-for-me/CLIProxyAPI/v7/internal/config"
"github.com/router-for-me/CLIProxyAPI/v7/internal/home"
"github.com/router-for-me/CLIProxyAPI/v7/internal/homeplugins"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
)
func (s *Service) syncHomePlugins(ctx context.Context, cfg *config.Config) error {
if s == nil || cfg == nil || !cfg.Home.Enabled || !cfg.Plugins.Enabled {
return nil
const homePluginStatusReportTimeout = 10 * time.Second
func (s *Service) syncHomePlugins(ctx context.Context, cfg *config.Config) (homeplugins.SyncReport, string, bool, error) {
if s == nil || cfg == nil || !cfg.Home.Enabled {
return homeplugins.SyncReport{}, "", false, nil
}
return homeplugins.Sync(ctx, cfg, s.pluginHost)
syncKey := homePluginSyncKey(cfg)
if syncKey != "" {
s.homePluginSyncMu.Lock()
if s.homePluginSyncKey == syncKey {
s.homePluginSyncMu.Unlock()
return homeplugins.SyncReport{}, syncKey, false, nil
}
s.homePluginSyncMu.Unlock()
}
report, errSync := homeplugins.SyncWithReport(ctx, cfg, s.pluginHost)
return report, syncKey, true, errSync
}
func (s *Service) markHomePluginsSynced(syncKey string) {
if s == nil || strings.TrimSpace(syncKey) == "" {
return
}
s.homePluginSyncMu.Lock()
s.homePluginSyncKey = syncKey
s.homePluginSyncMu.Unlock()
}
func (s *Service) reportHomePluginStatus(ctx context.Context, cfg *config.Config, report homeplugins.SyncReport) {
if s == nil || cfg == nil {
return
}
if s.homeClient == nil {
log.Warn("failed to report home plugin status: home client is unavailable")
return
}
nodeID := strings.TrimSpace(cfg.Home.NodeID)
if nodeID == "" {
log.Warn("failed to report home plugin status: node id is empty")
return
}
report.NodeID = nodeID
report.UpdatedAt = time.Now().UTC()
raw, errMarshal := json.Marshal(report)
if errMarshal != nil {
log.Warnf("failed to marshal home plugin status: %v", errMarshal)
return
}
if ctx == nil {
ctx = context.Background()
}
reportCtx, cancel := context.WithTimeout(ctx, homePluginStatusReportTimeout)
defer cancel()
if errReport := s.homeClient.RPushPluginStatus(reportCtx, raw); errReport != nil {
log.Warnf("failed to report home plugin status: %v", errReport)
}
}
func (s *Service) processHomePluginTasks(ctx context.Context, cfg *config.Config) {
if s == nil || cfg == nil || !cfg.Home.Enabled || s.homeClient == nil {
return
}
if ctx == nil {
ctx = context.Background()
}
tasks, errTasks := s.homeClient.GetPluginTasks(ctx)
if errTasks != nil {
log.Warnf("failed to fetch home plugin tasks: %v", errTasks)
return
}
for _, task := range tasks {
if !strings.EqualFold(strings.TrimSpace(task.Operation), "delete") {
continue
}
report := s.processHomePluginDeleteTask(ctx, cfg, task)
if !report.OK && strings.TrimSpace(report.Error) != "" {
log.Warnf("failed to process home plugin delete task %d for %s: %v", task.ID, task.PluginID, report.Error)
}
s.reportHomePluginStatus(ctx, cfg, report)
}
}
func (s *Service) processHomePluginDeleteTask(ctx context.Context, cfg *config.Config, task home.PluginTask) homeplugins.SyncReport {
return homeplugins.DeleteWithReport(ctx, cfg, s.pluginHost, task.ID, task.PluginID)
}
func homePluginSyncKey(cfg *config.Config) string {
if cfg == nil || !cfg.Home.Enabled {
return ""
}
hash := sha256.New()
_, _ = fmt.Fprintf(hash, "enabled=%t\ndir=%s\n", cfg.Plugins.Enabled, strings.TrimSpace(cfg.Plugins.Dir))
ids := make([]string, 0, len(cfg.Plugins.Configs))
for id := range cfg.Plugins.Configs {
ids = append(ids, id)
}
sort.Strings(ids)
for _, id := range ids {
item := cfg.Plugins.Configs[id]
enabled := false
if item.Enabled != nil {
enabled = *item.Enabled
}
_, _ = fmt.Fprintf(hash, "plugin=%s\nenabled=%t\npriority=%d\n", strings.TrimSpace(id), enabled, item.Priority)
if item.Raw.Kind != 0 {
raw, errMarshal := yaml.Marshal(&item.Raw)
if errMarshal == nil {
_, _ = hash.Write(raw)
}
}
_, _ = hash.Write([]byte{'\n'})
}
return hex.EncodeToString(hash.Sum(nil))
}

View File

@@ -0,0 +1,103 @@
package cliproxy
import (
"context"
"testing"
"github.com/router-for-me/CLIProxyAPI/v7/internal/config"
"github.com/router-for-me/CLIProxyAPI/v7/internal/home"
"gopkg.in/yaml.v3"
)
func TestSyncHomePluginsSkipsUnchangedSignature(t *testing.T) {
cfg := &config.Config{}
cfg.Home.Enabled = true
cfg.Plugins.Enabled = true
cfg.Plugins.Configs = map[string]config.PluginInstanceConfig{}
service := &Service{}
_, key, didSync, errSync := service.syncHomePlugins(context.Background(), cfg)
if errSync != nil {
t.Fatalf("syncHomePlugins() error = %v", errSync)
}
if !didSync || key == "" {
t.Fatalf("syncHomePlugins() didSync=%v key=%q, want first sync with key", didSync, key)
}
service.markHomePluginsSynced(key)
_, gotKey, didSync, errSync := service.syncHomePlugins(context.Background(), cfg)
if errSync != nil {
t.Fatalf("syncHomePlugins(second) error = %v", errSync)
}
if didSync || gotKey != key {
t.Fatalf("syncHomePlugins(second) didSync=%v key=%q, want skipped same key %q", didSync, gotKey, key)
}
}
func TestApplyHomeOverlayWarnsOnRuntimePluginSyncFailure(t *testing.T) {
base := &config.Config{}
base.Home.Enabled = true
base.Plugins.Enabled = true
service := &Service{cfg: base}
enabled := true
remote := &config.Config{}
remote.Plugins.Enabled = true
remote.Plugins.Configs = map[string]config.PluginInstanceConfig{
"broken": {
Enabled: &enabled,
Raw: yaml.Node{
Kind: yaml.MappingNode,
Tag: "!!map",
Content: []*yaml.Node{
{Kind: yaml.ScalarNode, Tag: "!!str", Value: "store"},
{
Kind: yaml.MappingNode,
Tag: "!!map",
Content: []*yaml.Node{
{Kind: yaml.ScalarNode, Tag: "!!str", Value: "id"},
{Kind: yaml.ScalarNode, Tag: "!!str", Value: "broken"},
},
},
},
},
},
}
if errApply := service.applyHomeOverlayContext(context.Background(), remote); errApply != nil {
t.Fatalf("applyHomeOverlayContext() error = %v, want warning-only plugin sync failure", errApply)
}
if service.cfg == nil || !service.cfg.Home.Enabled || !service.cfg.Plugins.Enabled {
t.Fatalf("service cfg = %+v, want applied home config despite plugin sync failure", service.cfg)
}
if service.homePluginSyncKey != "" {
t.Fatalf("homePluginSyncKey = %q, want empty after plugin sync failure", service.homePluginSyncKey)
}
}
func TestStartHomeSubscriberDoesNotPreMarkPluginSync(t *testing.T) {
cfg := &config.Config{}
cfg.Home.Enabled = true
cfg.Home.Host = "127.0.0.1"
cfg.Home.Port = 1
cfg.Plugins.Enabled = true
cfg.Plugins.Configs = map[string]config.PluginInstanceConfig{}
service := &Service{cfg: cfg}
ctx, cancel := context.WithCancel(context.Background())
cancel()
service.startHomeSubscriber(ctx)
defer func() {
home.ClearCurrent()
if service.homeCancel != nil {
service.homeCancel()
}
if service.homeClient != nil {
service.homeClient.Close()
}
}()
if service.homePluginSyncKey != "" {
t.Fatalf("homePluginSyncKey = %q, want empty before a successful plugin sync", service.homePluginSyncKey)
}
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/router-for-me/CLIProxyAPI/v7/internal/api"
"github.com/router-for-me/CLIProxyAPI/v7/internal/home"
"github.com/router-for-me/CLIProxyAPI/v7/internal/homeplugins"
"github.com/router-for-me/CLIProxyAPI/v7/internal/logging"
"github.com/router-for-me/CLIProxyAPI/v7/internal/pluginhost"
"github.com/router-for-me/CLIProxyAPI/v7/internal/redisqueue"
@@ -103,9 +104,11 @@ type Service struct {
// wsGateway manages websocket Gemini providers.
wsGateway *wsrelay.Manager
homeClient *home.Client
homeCancel context.CancelFunc
homeLogForwarder *logging.HomeAppLogForwarder
homeClient *home.Client
homeCancel context.CancelFunc
homeLogForwarder *logging.HomeAppLogForwarder
homePluginSyncMu sync.Mutex
homePluginSyncKey string
}
const (
@@ -1452,10 +1455,24 @@ func (s *Service) applyHomeOverlayContext(ctx context.Context, remoteCfg *config
forceHomeRuntimeConfig(&merged)
logHomeConfigChanges(baseCfg, &merged)
if errSync := s.syncHomePlugins(ctx, &merged); errSync != nil {
return errSync
report, syncKey, didSync, errSync := s.syncHomePlugins(ctx, &merged)
if didSync {
if errSync != nil {
log.Warnf("failed to sync home plugins: %v", errSync)
}
}
s.applyConfigUpdate(&merged)
if didSync {
errLoad := homeplugins.MarkLoadResults(&report, s.pluginHost)
if errLoad != nil {
log.Warnf("failed to load home plugins after config update: %v", errLoad)
}
s.reportHomePluginStatus(ctx, &merged, report)
if errSync == nil && errLoad == nil {
s.markHomePluginsSynced(syncKey)
}
}
s.processHomePluginTasks(ctx, &merged)
return nil
}