Files
server/plugin/manager.go
eternal-flame-AD 413d4c7f7b move examples to plugin directory
Signed-off-by: eternal-flame-AD <yume@yumechi.jp>
2025-09-16 10:25:32 -05:00

582 lines
16 KiB
Go

package plugin
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"path/filepath"
"plugin"
"strconv"
"strings"
"sync"
"github.com/gin-gonic/gin"
"github.com/gotify/server/v2/auth"
"github.com/gotify/server/v2/model"
"github.com/gotify/server/v2/plugin/compat"
"gopkg.in/yaml.v3"
papiv2 "github.com/gotify/plugin-api/v2"
"context"
"crypto/ed25519"
"crypto/rand"
"crypto/x509"
"net"
"github.com/gotify/plugin-api/v2/generated/protobuf"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/types/known/emptypb"
)
// The Database interface for encapsulating database access.
type Database interface {
GetUsers() ([]*model.User, error)
GetPluginConfByUserAndPath(userid uint, path string) (*model.PluginConf, error)
CreatePluginConf(p *model.PluginConf) error
GetPluginConfByApplicationID(appid uint) (*model.PluginConf, error)
UpdatePluginConf(p *model.PluginConf) error
CreateMessage(message *model.Message) error
GetPluginConfByID(id uint) (*model.PluginConf, error)
GetPluginConfByToken(token string) (*model.PluginConf, error)
GetUserByID(id uint) (*model.User, error)
CreateApplication(application *model.Application) error
UpdateApplication(app *model.Application) error
GetApplicationsByUser(userID uint) ([]*model.Application, error)
GetApplicationByToken(token string) (*model.Application, error)
}
// Notifier notifies when a new message was created.
type Notifier interface {
Notify(userID uint, message *model.MessageExternal)
}
type ServerVersionInfo struct {
Version string
Commit string
BuildDate string
}
type PluginConnection struct {
info *protobuf.Info
conn *grpc.ClientConn
}
type ServerMux struct {
version ServerVersionInfo
tlsClient *papiv2.EphemeralTLSClient
pluginDNSToModulePath map[string]string
pluginConnections map[string]PluginConnection
}
type infraTlsCreds struct {
pluginDNSToModulePath map[string]string
credentials.TransportCredentials
}
type infraTlsAuthInfo struct {
moduleName string
credentials.TLSInfo
}
func (c *infraTlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
netConn, authInfo, err := c.TransportCredentials.ServerHandshake(rawConn)
if err != nil {
log.Printf("ServerHandshake: error %v", err)
rawConn.Close()
return nil, nil, err
}
protocolInfo := authInfo.(credentials.TLSInfo)
serverName := protocolInfo.State.VerifiedChains[0][0].DNSNames[0]
moduleName, ok := c.pluginDNSToModulePath[serverName]
if !ok {
log.Printf("ServerHandshake: unknown server name %s", serverName)
netConn.Close()
rawConn.Close()
return nil, nil, fmt.Errorf("unknown server name %s", serverName)
}
return netConn, &infraTlsAuthInfo{
moduleName: moduleName,
TLSInfo: protocolInfo,
}, nil
}
// NewServerMux creates a server-side mux with an infra server that handles plugin-to-server calls.
func NewServerMux(info ServerVersionInfo) *ServerMux {
tlsClient, err := papiv2.NewEphemeralTLSClient()
if err != nil {
panic(err)
}
_, infraPriv, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
panic(err)
}
infraCsrBytes, err := x509.CreateCertificateRequest(rand.Reader, new(x509.CertificateRequest), infraPriv)
if err != nil {
panic(err)
}
infraCsr, err := x509.ParseCertificateRequest(infraCsrBytes)
if err != nil {
panic(err)
}
if err := infraCsr.CheckSignature(); err != nil {
panic(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AddCert(tlsClient.CACert())
pluginDNSToModulePath := make(map[string]string)
mux := &ServerMux{
version: info,
tlsClient: tlsClient,
pluginDNSToModulePath: pluginDNSToModulePath,
pluginConnections: make(map[string]PluginConnection),
}
return mux
}
// CACert returns the CA certificate for mutual TLS authentication.
func (s *ServerMux) CACert() *x509.Certificate {
return s.tlsClient.CACert()
}
// SignPluginCSR signs a certificate request for a plugin.
func (s *ServerMux) SignPluginCSR(moduleName string, csr *x509.CertificateRequest) ([]byte, error) {
return s.tlsClient.SignPluginCSR(moduleName, csr)
}
func (s *ServerMux) RegisterPlugin(target string, moduleName string) (*grpc.ClientConn, error) {
// TODO: implement exec and negotiate certificate
grpcConn, err := grpc.NewClient(target, grpc.WithTransportCredentials(credentials.NewTLS(s.tlsClient.ClientTLSConfig(moduleName))))
if err != nil {
return nil, err
}
if _, exists := s.pluginDNSToModulePath[papiv2.BuildPluginTLSName("*", moduleName)]; exists {
return nil, fmt.Errorf("plugin %s already registered", moduleName)
}
s.pluginDNSToModulePath[papiv2.BuildPluginTLSName("*", moduleName)] = moduleName
pluginClient := protobuf.NewPluginClient(grpcConn)
pluginInfo, err := pluginClient.GetPluginInfo(context.Background(), &emptypb.Empty{})
if err != nil {
return nil, err
}
s.pluginConnections[moduleName] = PluginConnection{
info: pluginInfo,
conn: grpcConn,
}
return grpcConn, nil
}
// GetPluginInfo returns the info of a plugin.
func (s *ServerMux) GetPluginInfo(moduleName string) (*protobuf.Info, error) {
conn, ok := s.pluginConnections[moduleName]
if !ok {
return nil, fmt.Errorf("plugin %s not registered", moduleName)
}
return conn.info, nil
}
// GetPluginConnection returns the connection to the plugin for Server-to-Plugin calls.
func (s *ServerMux) GetPluginConnection(moduleName string) (*grpc.ClientConn, error) {
conn, ok := s.pluginConnections[moduleName]
if !ok {
return nil, fmt.Errorf("plugin %s not registered", moduleName)
}
return conn.conn, nil
}
func (s *ServerMux) Close() error {
for _, conn := range s.pluginConnections {
conn.conn.Close()
}
return nil
}
// Manager is an encapsulating layer for plugins and manages all plugins and its instances.
type Manager struct {
mutex *sync.RWMutex
instances map[uint]compat.PluginInstance
plugins map[string]compat.Plugin
messages chan MessageWithUserID
db Database
mux *gin.RouterGroup
}
// NewManager created a Manager from configurations.
func NewManager(db Database, directory string, mux *gin.RouterGroup, notifier Notifier) (*Manager, error) {
manager := &Manager{
mutex: &sync.RWMutex{},
instances: map[uint]compat.PluginInstance{},
plugins: map[string]compat.Plugin{},
messages: make(chan MessageWithUserID),
db: db,
mux: mux,
}
go func() {
for {
message := <-manager.messages
internalMsg := &model.Message{
ApplicationID: message.Message.ApplicationID,
Title: message.Message.Title,
Priority: *message.Message.Priority,
Date: message.Message.Date,
Message: message.Message.Message,
}
if message.Message.Extras != nil {
internalMsg.Extras, _ = json.Marshal(message.Message.Extras)
}
db.CreateMessage(internalMsg)
message.Message.ID = internalMsg.ID
notifier.Notify(message.UserID, &message.Message)
}
}()
if err := manager.loadPlugins(directory); err != nil {
return nil, err
}
users, err := manager.db.GetUsers()
if err != nil {
return nil, err
}
for _, user := range users {
if err := manager.initializeForUser(*user); err != nil {
return nil, err
}
}
return manager, nil
}
// ErrAlreadyEnabledOrDisabled is returned on SetPluginEnabled call when a plugin is already enabled or disabled.
var ErrAlreadyEnabledOrDisabled = errors.New("config is already enabled/disabled")
func (m *Manager) applicationExists(token string) bool {
app, _ := m.db.GetApplicationByToken(token)
return app != nil
}
func (m *Manager) pluginConfExists(token string) bool {
pluginConf, _ := m.db.GetPluginConfByToken(token)
return pluginConf != nil
}
// SetPluginEnabled sets the plugins enabled state.
func (m *Manager) SetPluginEnabled(pluginID uint, enabled bool) error {
instance, err := m.Instance(pluginID)
if err != nil {
return errors.New("instance not found")
}
conf, err := m.db.GetPluginConfByID(pluginID)
if err != nil {
return err
}
if conf.Enabled == enabled {
return ErrAlreadyEnabledOrDisabled
}
m.mutex.Lock()
defer m.mutex.Unlock()
if enabled {
err = instance.Enable()
} else {
err = instance.Disable()
}
if err != nil {
return err
}
if newConf, err := m.db.GetPluginConfByID(pluginID); /* conf might be updated by instance */ err == nil {
conf = newConf
}
conf.Enabled = enabled
return m.db.UpdatePluginConf(conf)
}
// PluginInfo returns plugin info.
func (m *Manager) PluginInfo(modulePath string) compat.Info {
m.mutex.RLock()
defer m.mutex.RUnlock()
if p, ok := m.plugins[modulePath]; ok {
return p.PluginInfo()
}
fmt.Println("Could not get plugin info for", modulePath)
return compat.Info{
Name: "UNKNOWN",
ModulePath: modulePath,
Description: "Oops something went wrong",
}
}
// Instance returns an instance with the given ID.
func (m *Manager) Instance(pluginID uint) (compat.PluginInstance, error) {
m.mutex.RLock()
defer m.mutex.RUnlock()
if instance, ok := m.instances[pluginID]; ok {
return instance, nil
}
return nil, errors.New("instance not found")
}
// HasInstance returns whether the given plugin ID has a corresponding instance.
func (m *Manager) HasInstance(pluginID uint) bool {
instance, err := m.Instance(pluginID)
return err == nil && instance != nil
}
// RemoveUser disabled all plugins of a user when the user is disabled.
func (m *Manager) RemoveUser(userID uint) error {
for _, p := range m.plugins {
pluginConf, err := m.db.GetPluginConfByUserAndPath(userID, p.PluginInfo().ModulePath)
if err != nil {
return err
}
if pluginConf == nil {
continue
}
if pluginConf.Enabled {
inst, err := m.Instance(pluginConf.ID)
if err != nil {
continue
}
m.mutex.Lock()
err = inst.Disable()
m.mutex.Unlock()
if err != nil {
return err
}
}
delete(m.instances, pluginConf.ID)
}
return nil
}
type pluginFileLoadError struct {
Filename string
UnderlyingError error
}
func (c pluginFileLoadError) Error() string {
return fmt.Sprintf("error while loading plugin %s: %s", c.Filename, c.UnderlyingError)
}
func (m *Manager) loadPlugins(directory string) error {
if directory == "" {
return nil
}
pluginFiles, err := os.ReadDir(directory)
if err != nil {
return fmt.Errorf("error while reading directory %s", err)
}
for _, f := range pluginFiles {
if f.IsDir() {
continue
}
name := f.Name()
if strings.HasPrefix(name, ".") {
continue
}
pluginPath := filepath.Join(directory, "./", name)
fmt.Println("Loading plugin", pluginPath)
pRaw, err := plugin.Open(pluginPath)
if err != nil {
return pluginFileLoadError{name, err}
}
compatPlugin, err := compat.Wrap(pRaw)
if err != nil {
return pluginFileLoadError{name, err}
}
if err := m.LoadPlugin(compatPlugin); err != nil {
return pluginFileLoadError{name, err}
}
}
return nil
}
// LoadPlugin loads a compat plugin, exported to sideload plugins for testing purposes.
func (m *Manager) LoadPlugin(compatPlugin compat.Plugin) error {
modulePath := compatPlugin.PluginInfo().ModulePath
if _, ok := m.plugins[modulePath]; ok {
return fmt.Errorf("plugin with module path %s is present at least twice", modulePath)
}
m.plugins[modulePath] = compatPlugin
return nil
}
// InitializeForUserID initializes all plugin instances for a given user.
func (m *Manager) InitializeForUserID(userID uint) error {
m.mutex.Lock()
defer m.mutex.Unlock()
user, err := m.db.GetUserByID(userID)
if err != nil {
return err
}
if user != nil {
return m.initializeForUser(*user)
}
return fmt.Errorf("user with id %d not found", userID)
}
func (m *Manager) initializeForUser(user model.User) error {
userCtx := compat.UserContext{
ID: user.ID,
Name: user.Name,
Admin: user.Admin,
}
for _, p := range m.plugins {
if err := m.initializeSingleUserPlugin(userCtx, p); err != nil {
return err
}
}
apps, err := m.db.GetApplicationsByUser(user.ID)
if err != nil {
return err
}
for _, app := range apps {
conf, err := m.db.GetPluginConfByApplicationID(app.ID)
if err != nil {
return err
}
if conf != nil {
_, compatExist := m.plugins[conf.ModulePath]
app.Internal = compatExist
} else {
app.Internal = false
}
m.db.UpdateApplication(app)
}
return nil
}
func (m *Manager) initializeSingleUserPlugin(userCtx compat.UserContext, p compat.Plugin) error {
info := p.PluginInfo()
instance := p.NewPluginInstance(userCtx)
userID := userCtx.ID
pluginConf, err := m.db.GetPluginConfByUserAndPath(userID, info.ModulePath)
if err != nil {
return err
}
if pluginConf == nil {
var err error
pluginConf, err = m.createPluginConf(instance, info, userID)
if err != nil {
return err
}
}
m.instances[pluginConf.ID] = instance
if compat.HasSupport(instance, compat.Messenger) {
instance.SetMessageHandler(redirectToChannel{
ApplicationID: pluginConf.ApplicationID,
UserID: pluginConf.UserID,
Messages: m.messages,
})
}
if compat.HasSupport(instance, compat.Storager) {
instance.SetStorageHandler(dbStorageHandler{pluginConf.ID, m.db})
}
if compat.HasSupport(instance, compat.Configurer) {
m.initializeConfigurerForSingleUserPlugin(instance, pluginConf)
}
if compat.HasSupport(instance, compat.Webhooker) {
id := pluginConf.ID
g := m.mux.Group(pluginConf.Token+"/", requirePluginEnabled(id, m.db))
instance.RegisterWebhook(strings.Replace(g.BasePath(), ":id", strconv.Itoa(int(id)), 1), g)
}
if pluginConf.Enabled {
err := instance.Enable()
if err != nil {
// Single user plugin cannot be enabled
// Don't panic, disable for now and wait for user to update config
log.Printf("Plugin initialize failed for user %s: %s. Disabling now...", userCtx.Name, err.Error())
pluginConf.Enabled = false
m.db.UpdatePluginConf(pluginConf)
}
}
return nil
}
func (m *Manager) initializeConfigurerForSingleUserPlugin(instance compat.PluginInstance, pluginConf *model.PluginConf) {
if len(pluginConf.Config) == 0 {
// The Configurer is newly implemented
// Use the default config
pluginConf.Config, _ = yaml.Marshal(instance.DefaultConfig())
m.db.UpdatePluginConf(pluginConf)
}
c := instance.DefaultConfig()
if yaml.Unmarshal(pluginConf.Config, c) != nil || instance.ValidateAndSetConfig(c) != nil {
pluginConf.Enabled = false
log.Printf("Plugin %s for user %d failed to initialize because it rejected the current config. It might be outdated. A default config is used and the user would need to enable it again.", pluginConf.ModulePath, pluginConf.UserID)
newConf := bytes.NewBufferString("# Plugin initialization failed because it rejected the current config. It might be outdated.\r\n# A default plugin configuration is used:\r\n")
d, _ := yaml.Marshal(c)
newConf.Write(d)
newConf.WriteString("\r\n")
newConf.WriteString("# The original configuration: \r\n")
oldConf := bufio.NewScanner(bytes.NewReader(pluginConf.Config))
for oldConf.Scan() {
newConf.WriteString("# ")
newConf.WriteString(oldConf.Text())
newConf.WriteString("\r\n")
}
pluginConf.Config = newConf.Bytes()
m.db.UpdatePluginConf(pluginConf)
instance.ValidateAndSetConfig(instance.DefaultConfig())
}
}
func (m *Manager) createPluginConf(instance compat.PluginInstance, info compat.Info, userID uint) (*model.PluginConf, error) {
pluginConf := &model.PluginConf{
UserID: userID,
ModulePath: info.ModulePath,
Token: auth.GenerateNotExistingToken(auth.GeneratePluginToken, m.pluginConfExists),
}
if compat.HasSupport(instance, compat.Configurer) {
pluginConf.Config, _ = yaml.Marshal(instance.DefaultConfig())
}
if compat.HasSupport(instance, compat.Messenger) {
app := &model.Application{
Token: auth.GenerateNotExistingToken(auth.GenerateApplicationToken, m.applicationExists),
Name: info.String(),
UserID: userID,
Internal: true,
Description: fmt.Sprintf("auto generated application for %s", info.ModulePath),
}
if err := m.db.CreateApplication(app); err != nil {
return nil, err
}
pluginConf.ApplicationID = app.ID
}
if err := m.db.CreatePluginConf(pluginConf); err != nil {
return nil, err
}
return pluginConf, nil
}