From f50f83feb1f29760167a38422e9c55d24ec2dfcf Mon Sep 17 00:00:00 2001 From: Jian Qiu Date: Tue, 6 Jan 2026 18:05:41 +0800 Subject: [PATCH] fix: endpoint mode slave (#24016) Co-authored-by: Qiu Jian --- cmd/climc/shell/identity/endpoints.go | 2 +- cmd/climc/shell/misc/pprof.go | 3 +- pkg/ansibleserver/service/handlers.go | 4 +- pkg/ansibleserver/service/service.go | 2 +- pkg/apigateway/handler/backendproxy.go | 5 +- pkg/apigateway/handler/proxy.go | 6 +- pkg/apis/identity/consts.go | 2 + pkg/appsrv/dispatcher/dispatcher.go | 8 +- pkg/appsrv/dispatcher/jointdispatcher.go | 8 +- pkg/baremetal/manager.go | 4 +- pkg/cloudcommon/app/auth.go | 3 +- pkg/cloudcommon/db/quotas/handler.go | 48 +++++----- pkg/cloudcommon/db/taskman/handler.go | 6 +- pkg/cloudcommon/tsdb/tsdb.go | 3 +- pkg/cloudevent/service/handlers.go | 6 +- pkg/cloudevent/service/service.go | 2 +- pkg/cloudid/service/handlers.go | 8 +- pkg/cloudid/service/service.go | 22 ++--- pkg/cloudir/service/handlers.go | 4 +- pkg/cloudir/service/service.go | 2 +- pkg/cloudnet/service/handlers.go | 4 +- pkg/cloudnet/service/service.go | 2 +- pkg/cloudproxy/service/handlers.go | 4 +- pkg/cloudproxy/service/service.go | 2 +- pkg/compute/misc/handler.go | 3 +- pkg/compute/models/buckets.go | 3 +- pkg/compute/models/hosts.go | 3 +- pkg/compute/service/handlers.go | 24 ++--- pkg/compute/service/service.go | 22 +++-- pkg/devtool/service/handler.go | 6 +- pkg/devtool/service/service.go | 2 +- pkg/hostman/hostinfo/hostinfo.go | 7 +- pkg/hostman/storageman/imagecache_local.go | 2 +- pkg/image/service/handlers.go | 8 +- pkg/image/service/service.go | 88 ++++++++++--------- pkg/image/torrent/torrent.go | 5 +- pkg/keystone/models/endpoints.go | 26 +++--- pkg/keystone/service/handlers.go | 8 +- pkg/keystone/service/service.go | 4 +- pkg/llm/service/handler.go | 6 +- pkg/llm/service/service.go | 2 +- pkg/logger/service/handlers.go | 4 +- pkg/logger/service/service.go | 2 +- pkg/mcclient/auth/auth.go | 22 ++--- pkg/mcclient/modulebase/base.go | 4 +- pkg/mcclient/modulebase/modules.go | 5 +- pkg/mcclient/modules/compute/mod_metadatas.go | 3 +- pkg/mcclient/modules/compute/mod_skus.go | 2 +- .../modules/webconsole/mod_webconsole.go | 3 +- pkg/mcclient/session.go | 40 +++++---- pkg/mcclient/token.go | 18 ++-- pkg/mcclient/token3.go | 36 ++++---- pkg/monitor/datasource/datasource.go | 3 +- pkg/monitor/service/handlers.go | 10 +-- pkg/monitor/service/service.go | 6 +- pkg/notify/service/handlers.go | 8 +- pkg/notify/service/service.go | 16 ++-- pkg/scheduledtask/service/handlers.go | 4 +- pkg/scheduledtask/service/service.go | 13 +-- pkg/webconsole/service/handlers.go | 25 +++--- pkg/webconsole/service/service.go | 12 +-- pkg/yunionconf/service/handlers.go | 15 ++-- pkg/yunionconf/service/service.go | 2 +- 63 files changed, 351 insertions(+), 281 deletions(-) diff --git a/cmd/climc/shell/identity/endpoints.go b/cmd/climc/shell/identity/endpoints.go index a5694ef2c7..8c7e691cda 100644 --- a/cmd/climc/shell/identity/endpoints.go +++ b/cmd/climc/shell/identity/endpoints.go @@ -66,7 +66,7 @@ func init() { type EndpointCreateOptions struct { SERVICE string `help:"Service ID or Name"` REGION string `help:"Region"` - INTERFACE string `help:"Interface types" choices:"internal|public|admin|console"` + INTERFACE string `help:"Interface types" choices:"internal|public|admin|console|slave"` URL string `help:"URL"` Zone string `help:"Zone"` Name string `help:"Name"` diff --git a/cmd/climc/shell/misc/pprof.go b/cmd/climc/shell/misc/pprof.go index d5e6dd009b..34032bf3d6 100644 --- a/cmd/climc/shell/misc/pprof.go +++ b/cmd/climc/shell/misc/pprof.go @@ -24,6 +24,7 @@ import ( "yunion.io/x/jsonutils" "yunion.io/x/pkg/errors" + "yunion.io/x/pkg/util/httputils" "yunion.io/x/pkg/util/netutils" "yunion.io/x/pkg/util/signalutils" @@ -76,7 +77,7 @@ func init() { svcUrl string ) if len(opts.Service) > 0 { - svcUrl, err = s.GetServiceURL(opts.Service, "") + svcUrl, err = s.GetServiceURL(opts.Service, "", httputils.GET) if err != nil { return errors.Wrapf(err, "get service %s url", opts.Service) } diff --git a/pkg/ansibleserver/service/handlers.go b/pkg/ansibleserver/service/handlers.go index aed35209ec..2ccfb2d0e3 100644 --- a/pkg/ansibleserver/service/handlers.go +++ b/pkg/ansibleserver/service/handlers.go @@ -35,7 +35,7 @@ import ( "yunion.io/x/onecloud/pkg/cloudcommon/db" ) -func InitHandlers(app *appsrv.Application) { +func InitHandlers(app *appsrv.Application, isSlave bool) { db.InitAllManagers() db.RegisterModelManager(db.OpsLog) @@ -51,6 +51,6 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher("", app, handler) + dispatcher.AddModelDispatcher("", app, handler, isSlave) } } diff --git a/pkg/ansibleserver/service/service.go b/pkg/ansibleserver/service/service.go index 3e9950f8f7..2fadcfa7bc 100644 --- a/pkg/ansibleserver/service/service.go +++ b/pkg/ansibleserver/service/service.go @@ -45,7 +45,7 @@ func StartService() { cloudcommon.InitDB(dbOpts) - InitHandlers(app) + InitHandlers(app, opts.IsSlaveNode) db.EnsureAppSyncDB(app, dbOpts, models.InitDB) defer cloudcommon.CloseDB() diff --git a/pkg/apigateway/handler/backendproxy.go b/pkg/apigateway/handler/backendproxy.go index 801312308a..0f200aa02c 100644 --- a/pkg/apigateway/handler/backendproxy.go +++ b/pkg/apigateway/handler/backendproxy.go @@ -24,7 +24,9 @@ import ( "yunion.io/x/log" "yunion.io/x/pkg/appctx" + "yunion.io/x/pkg/util/httputils" + "yunion.io/x/onecloud/pkg/apis/identity" "yunion.io/x/onecloud/pkg/appsrv" "yunion.io/x/onecloud/pkg/httperrors" "yunion.io/x/onecloud/pkg/mcclient/auth" @@ -149,12 +151,11 @@ func (h *SBackendServiceProxyHandler) fetchReverseEndpoint() *proxy.SEndpointFac zone = path[:slashPos] } } - endpointType := "internalURL" session := auth.GetAdminSession(ctx, region) if len(zone) > 0 { session.SetZone(zone) } - ep, err := session.GetServiceURL(serviceName, endpointType) + ep, err := session.GetServiceURL(serviceName, identity.EndpointInterfaceInternal, httputils.THttpMethod(r.Method)) if err != nil { return "", httperrors.NewBadRequestError("invalid service %s: %s", serviceName, err) } diff --git a/pkg/apigateway/handler/proxy.go b/pkg/apigateway/handler/proxy.go index e5a4553f13..9d0aafe08e 100644 --- a/pkg/apigateway/handler/proxy.go +++ b/pkg/apigateway/handler/proxy.go @@ -19,7 +19,10 @@ import ( "net/http" "net/url" + "yunion.io/x/pkg/util/httputils" + "yunion.io/x/onecloud/pkg/apis" + "yunion.io/x/onecloud/pkg/apis/identity" "yunion.io/x/onecloud/pkg/appsrv" "yunion.io/x/onecloud/pkg/mcclient/auth" "yunion.io/x/onecloud/pkg/proxy" @@ -67,9 +70,8 @@ func getEndpointSchemeHost(endpoint string) (string, error) { func fetchReverseEndpoint(serviceName string) *proxy.SEndpointFactory { f := func(ctx context.Context, r *http.Request) (string, error) { - endpointType := "internalURL" session := auth.GetAdminSession(ctx, FetchRegion(r)) - ep, err := session.GetServiceURL(serviceName, endpointType) + ep, err := session.GetServiceURL(serviceName, identity.EndpointInterfaceInternal, httputils.THttpMethod(r.Method)) if err != nil { return "", err } diff --git a/pkg/apis/identity/consts.go b/pkg/apis/identity/consts.go index 32bb205d9d..a121e6de86 100644 --- a/pkg/apis/identity/consts.go +++ b/pkg/apis/identity/consts.go @@ -58,6 +58,8 @@ const ( EndpointInterfaceApigateway = "apigateway" + EndpointInterfaceSlave = "slave" + KeystoneDomainRoot = "<>" IdMappingEntityUser = "user" diff --git a/pkg/appsrv/dispatcher/dispatcher.go b/pkg/appsrv/dispatcher/dispatcher.go index 4cc9e70c88..e8489fae63 100644 --- a/pkg/appsrv/dispatcher/dispatcher.go +++ b/pkg/appsrv/dispatcher/dispatcher.go @@ -31,7 +31,7 @@ import ( "yunion.io/x/onecloud/pkg/mcclient/modulebase" ) -func AddModelDispatcher(prefix string, app *appsrv.Application, manager IModelDispatchHandler) { +func AddModelDispatcher(prefix string, app *appsrv.Application, manager IModelDispatchHandler, isSlave bool) { metadata := map[string]interface{}{"manager": manager} tags := map[string]string{"resource": manager.KeywordPlural()} // list @@ -69,6 +69,12 @@ func AddModelDispatcher(prefix string, app *appsrv.Application, manager IModelDi fmt.Sprintf("%s/%s//", prefix, manager.KeywordPlural()), manager.Filter(getSpecHandler), metadata, "get_specific", tags) manager.CustomizeHandlerInfo(h) + + if isSlave { + // slave node only support get and head + return + } + // create // create multi h = app.AddHandler2("POST", diff --git a/pkg/appsrv/dispatcher/jointdispatcher.go b/pkg/appsrv/dispatcher/jointdispatcher.go index 7603400529..5dea686ec4 100644 --- a/pkg/appsrv/dispatcher/jointdispatcher.go +++ b/pkg/appsrv/dispatcher/jointdispatcher.go @@ -29,7 +29,7 @@ import ( "yunion.io/x/onecloud/pkg/mcclient/modulebase" ) -func AddJointModelDispatcher(prefix string, app *appsrv.Application, manager IJointModelDispatchHandler) { +func AddJointModelDispatcher(prefix string, app *appsrv.Application, manager IJointModelDispatchHandler, isSlave bool) { metadata := map[string]interface{}{"manager": manager} tags := map[string]string{"resource": manager.KeywordPlural()} // list @@ -64,6 +64,12 @@ func AddJointModelDispatcher(prefix string, app *appsrv.Application, manager IJo manager.MasterKeywordPlural()), manager.Filter(jointGetHandler), metadata, "get_joint", tags) + + if isSlave { + // slave node only support get and head + return + } + // joint attach app.AddHandler2("POST", fmt.Sprintf("%s/%s//%s/", prefix, diff --git a/pkg/baremetal/manager.go b/pkg/baremetal/manager.go index 7a64e74856..6706e120c8 100644 --- a/pkg/baremetal/manager.go +++ b/pkg/baremetal/manager.go @@ -1155,7 +1155,7 @@ func (b *SBaremetalInstance) getHTTPFileUrl(filename string) string { func (b *SBaremetalInstance) GetImageUrl(disableImageCache bool) string { if disableImageCache { - url, err := b.GetPublicClientSession().GetServiceURL(apis.SERVICE_TYPE_IMAGE, apiidenty.EndpointInterfacePublic) + url, err := b.GetPublicClientSession().GetServiceURL(apis.SERVICE_TYPE_IMAGE, apiidenty.EndpointInterfacePublic, httputils.GET) if err != nil { log.Errorf("Get image public url: %v", err) return "" @@ -2388,7 +2388,7 @@ func (b *SBaremetalInstance) getBootIsoImagePath() string { func (b *SBaremetalInstance) DoNTPConfig() error { var urls []string for _, ep := range []string{"internal", "public"} { - urls, _ = auth.GetServiceURLs("ntp", o.Options.Region, "", ep) + urls, _ = auth.GetServiceURLs("ntp", o.Options.Region, "", ep, httputils.POST) if len(urls) > 0 { break } diff --git a/pkg/cloudcommon/app/auth.go b/pkg/cloudcommon/app/auth.go index 03e75e964a..e904a513cf 100644 --- a/pkg/cloudcommon/app/auth.go +++ b/pkg/cloudcommon/app/auth.go @@ -19,6 +19,7 @@ import ( "time" "yunion.io/x/log" + "yunion.io/x/pkg/util/httputils" "yunion.io/x/pkg/utils" "yunion.io/x/onecloud/pkg/apis" @@ -113,7 +114,7 @@ func FetchEtcdServiceInfo() (*identity.EndpointDetails, error) { func startEtcdEndpointPuller() { retryInterval := 60 - etecdUrl, err := auth.GetServiceURL(apis.SERVICE_TYPE_ETCD, consts.GetRegion(), "", identity.EndpointInterfaceInternal) + etecdUrl, err := auth.GetServiceURL(apis.SERVICE_TYPE_ETCD, consts.GetRegion(), "", identity.EndpointInterfaceInternal, httputils.POST) if err != nil { log.Errorf("[etcd] GetServiceURL fail %s, retry after %d seconds", err, retryInterval) } else if len(etecdUrl) == 0 { diff --git a/pkg/cloudcommon/db/quotas/handler.go b/pkg/cloudcommon/db/quotas/handler.go index 4fd55eaaa6..bd3b26b2ff 100644 --- a/pkg/cloudcommon/db/quotas/handler.go +++ b/pkg/cloudcommon/db/quotas/handler.go @@ -72,7 +72,7 @@ type SBaseQuotaQueryInput struct { Refresh bool `json:"refresh"` } -func AddQuotaHandler(manager *SQuotaBaseManager, prefix string, app *appsrv.Application) { +func AddQuotaHandler(manager *SQuotaBaseManager, prefix string, app *appsrv.Application, isSlave bool) { app.AddHandler2("GET", fmt.Sprintf("%s/%s", prefix, manager.KeywordPlural()), auth.Authenticate(manager.getQuotaHandler), nil, "get_quota", nil) @@ -85,21 +85,23 @@ func AddQuotaHandler(manager *SQuotaBaseManager, prefix string, app *appsrv.Appl fmt.Sprintf("%s/%s/domains/", prefix, manager.KeywordPlural()), auth.Authenticate(manager.getQuotaHandler), nil, "get_quota_for_domain", nil) - app.AddHandler2("POST", - fmt.Sprintf("%s/%s", prefix, manager.KeywordPlural()), - auth.Authenticate(manager.setQuotaHandler), nil, "set_quota", nil) + if !isSlave { + app.AddHandler2("POST", + fmt.Sprintf("%s/%s", prefix, manager.KeywordPlural()), + auth.Authenticate(manager.setQuotaHandler), nil, "set_quota", nil) - app.AddHandler2("POST", - fmt.Sprintf("%s/%s/domains/", prefix, manager.KeywordPlural()), - auth.Authenticate(manager.setQuotaHandler), nil, "set_quota_for_domain", nil) + app.AddHandler2("POST", + fmt.Sprintf("%s/%s/domains/", prefix, manager.KeywordPlural()), + auth.Authenticate(manager.setQuotaHandler), nil, "set_quota_for_domain", nil) - app.AddHandler2("DELETE", - fmt.Sprintf("%s/%s/pending", prefix, manager.KeywordPlural()), - auth.Authenticate(manager.cleanPendingUsageHandler), nil, "clean_pending_usage", nil) + app.AddHandler2("DELETE", + fmt.Sprintf("%s/%s/pending", prefix, manager.KeywordPlural()), + auth.Authenticate(manager.cleanPendingUsageHandler), nil, "clean_pending_usage", nil) - app.AddHandler2("DELETE", - fmt.Sprintf("%s/%s/domains//pending", prefix, manager.KeywordPlural()), - auth.Authenticate(manager.cleanPendingUsageHandler), nil, "clean_pending_usage_for_domain", nil) + app.AddHandler2("DELETE", + fmt.Sprintf("%s/%s/domains//pending", prefix, manager.KeywordPlural()), + auth.Authenticate(manager.cleanPendingUsageHandler), nil, "clean_pending_usage_for_domain", nil) + } if manager.scope == rbacscope.ScopeProject { app.AddHandler2("GET", @@ -114,17 +116,19 @@ func AddQuotaHandler(manager *SQuotaBaseManager, prefix string, app *appsrv.Appl fmt.Sprintf("%s/%s/projects/", prefix, manager.KeywordPlural()), auth.Authenticate(manager.getQuotaHandler), nil, "get_quota_for_project", nil) - app.AddHandler2("POST", - fmt.Sprintf("%s/%s/", prefix, manager.KeywordPlural()), - auth.Authenticate(manager.setQuotaHandler), nil, "set_quota_for_project", nil) + if !isSlave { + app.AddHandler2("POST", + fmt.Sprintf("%s/%s/", prefix, manager.KeywordPlural()), + auth.Authenticate(manager.setQuotaHandler), nil, "set_quota_for_project", nil) - app.AddHandler2("POST", - fmt.Sprintf("%s/%s/projects/", prefix, manager.KeywordPlural()), - auth.Authenticate(manager.setQuotaHandler), nil, "set_quota_for_project", nil) + app.AddHandler2("POST", + fmt.Sprintf("%s/%s/projects/", prefix, manager.KeywordPlural()), + auth.Authenticate(manager.setQuotaHandler), nil, "set_quota_for_project", nil) - app.AddHandler2("DELETE", - fmt.Sprintf("%s/%s/projects//pending", prefix, manager.KeywordPlural()), - auth.Authenticate(manager.cleanPendingUsageHandler), nil, "clean_pending_usage_for_project", nil) + app.AddHandler2("DELETE", + fmt.Sprintf("%s/%s/projects//pending", prefix, manager.KeywordPlural()), + auth.Authenticate(manager.cleanPendingUsageHandler), nil, "clean_pending_usage_for_project", nil) + } } } diff --git a/pkg/cloudcommon/db/taskman/handler.go b/pkg/cloudcommon/db/taskman/handler.go index 16f3b012f6..143431bfae 100644 --- a/pkg/cloudcommon/db/taskman/handler.go +++ b/pkg/cloudcommon/db/taskman/handler.go @@ -20,13 +20,13 @@ import ( "yunion.io/x/onecloud/pkg/cloudcommon/db" ) -func AddTaskHandler(prefix string, app *appsrv.Application) { +func AddTaskHandler(prefix string, app *appsrv.Application, isSlave bool) { handler := db.NewModelHandler(TaskManager) - dispatcher.AddModelDispatcher(prefix, app, handler) + dispatcher.AddModelDispatcher(prefix, app, handler, isSlave) { initArchivedTaskManager() archiveHandler := db.NewModelHandler(ArchivedTaskManager) - dispatcher.AddModelDispatcher(prefix, app, archiveHandler) + dispatcher.AddModelDispatcher(prefix, app, archiveHandler, isSlave) } } diff --git a/pkg/cloudcommon/tsdb/tsdb.go b/pkg/cloudcommon/tsdb/tsdb.go index 0aec6f4218..7e931f5b96 100644 --- a/pkg/cloudcommon/tsdb/tsdb.go +++ b/pkg/cloudcommon/tsdb/tsdb.go @@ -18,6 +18,7 @@ import ( "math/rand" "yunion.io/x/pkg/errors" + "yunion.io/x/pkg/util/httputils" "yunion.io/x/onecloud/pkg/apis" "yunion.io/x/onecloud/pkg/mcclient" @@ -38,7 +39,7 @@ func NewTSDBServiceSource(t string, urls []string) *TSDBServiceSource { func GetDefaultServiceSource(s *mcclient.ClientSession, endpointType string) (*TSDBServiceSource, error) { errs := []error{} for _, sType := range []string{apis.SERVICE_TYPE_INFLUXDB, apis.SERVICE_TYPE_VICTORIA_METRICS} { - urls, err := s.GetServiceURLs(sType, endpointType) + urls, err := s.GetServiceURLs(sType, endpointType, httputils.POST) if err != nil { errs = append(errs, errors.Wrapf(err, "get %s service type %q", endpointType, sType)) } diff --git a/pkg/cloudevent/service/handlers.go b/pkg/cloudevent/service/handlers.go index a0cde7d90f..e20a885342 100644 --- a/pkg/cloudevent/service/handlers.go +++ b/pkg/cloudevent/service/handlers.go @@ -37,12 +37,12 @@ import ( "yunion.io/x/onecloud/pkg/cloudevent/models" ) -func InitHandlers(app *appsrv.Application) { +func InitHandlers(app *appsrv.Application, isSlave bool) { db.InitAllManagers() models.InitCloudevent() - taskman.AddTaskHandler("v1", app) + taskman.AddTaskHandler("v1", app, isSlave) for _, manager := range []db.IModelManager{ taskman.TaskManager, @@ -67,6 +67,6 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher("", app, handler) + dispatcher.AddModelDispatcher("", app, handler, isSlave) } } diff --git a/pkg/cloudevent/service/service.go b/pkg/cloudevent/service/service.go index 84c8d1917b..7cc4c51f20 100644 --- a/pkg/cloudevent/service/service.go +++ b/pkg/cloudevent/service/service.go @@ -52,7 +52,7 @@ func StartService() { app := common_app.InitApp(baseOpts, false) cloudcommon.InitDB(dbOpts) - InitHandlers(app) + InitHandlers(app, opts.IsSlaveNode) db.EnsureAppSyncDB(app, dbOpts, models.InitDB) defer cloudcommon.CloseDB() diff --git a/pkg/cloudid/service/handlers.go b/pkg/cloudid/service/handlers.go index e2e5d429eb..35f9ab9571 100644 --- a/pkg/cloudid/service/handlers.go +++ b/pkg/cloudid/service/handlers.go @@ -37,10 +37,10 @@ import ( "yunion.io/x/onecloud/pkg/cloudid/models" ) -func InitHandlers(app *appsrv.Application) { +func InitHandlers(app *appsrv.Application, isSlave bool) { db.InitAllManagers() - taskman.AddTaskHandler("v1", app) + taskman.AddTaskHandler("v1", app, isSlave) db.AddScopeResourceCountHandler("", app) @@ -72,7 +72,7 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher("", app, handler) + dispatcher.AddModelDispatcher("", app, handler, isSlave) } for _, manager := range []db.IJointModelManager{ @@ -82,7 +82,7 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewJointModelHandler(manager) - dispatcher.AddJointModelDispatcher("", app, handler) + dispatcher.AddJointModelDispatcher("", app, handler, isSlave) } } diff --git a/pkg/cloudid/service/service.go b/pkg/cloudid/service/service.go index 0584efb3fa..a8b4419828 100644 --- a/pkg/cloudid/service/service.go +++ b/pkg/cloudid/service/service.go @@ -64,7 +64,7 @@ func StartService() { cloudcommon.InitDB(dbOpts) - InitHandlers(app) + InitHandlers(app, opts.IsSlaveNode) db.EnsureAppSyncDB(app, dbOpts, models.InitDB) defer cloudcommon.CloseDB() @@ -75,17 +75,17 @@ func StartService() { return } - models.CloudaccountManager.StartWatchSAMLInRegion() - if err != nil { - log.Fatalf("StartWatchSAMLInRegion error: %v", err) - } - - models.CloudproviderManager.StartWatchInRegion() - if err != nil { - log.Fatalf("StartWatchInRegion error: %v", err) - } - if !opts.IsSlaveNode { + models.CloudaccountManager.StartWatchSAMLInRegion() + if err != nil { + log.Fatalf("StartWatchSAMLInRegion error: %v", err) + } + + models.CloudproviderManager.StartWatchInRegion() + if err != nil { + log.Fatalf("StartWatchInRegion error: %v", err) + } + err := taskman.TaskManager.InitializeData() if err != nil { log.Fatalf("TaskManager.InitializeData fail %s", err) diff --git a/pkg/cloudir/service/handlers.go b/pkg/cloudir/service/handlers.go index dd30bac509..6bf9d1cc48 100644 --- a/pkg/cloudir/service/handlers.go +++ b/pkg/cloudir/service/handlers.go @@ -22,11 +22,11 @@ import ( "yunion.io/x/onecloud/pkg/cloudcommon/etcd/models/base" ) -func initHandlers(app *appsrv.Application) { +func initHandlers(app *appsrv.Application, isSlave bool) { for _, manager := range []base.IEtcdModelManager{ models.ServiceRegistryManager, } { handler := handler.NewEtcdModelHandler(manager) - dispatcher.AddModelDispatcher("", app, handler) + dispatcher.AddModelDispatcher("", app, handler, isSlave) } } diff --git a/pkg/cloudir/service/service.go b/pkg/cloudir/service/service.go index ffb20c73c1..34096fa6c8 100644 --- a/pkg/cloudir/service/service.go +++ b/pkg/cloudir/service/service.go @@ -42,7 +42,7 @@ func StartService() { } app := app_common.InitApp(baseOpts, false) - initHandlers(app) + initHandlers(app, opts.IsSlaveNode) app_common.ServeForeverWithCleanup(app, baseOpts, func() { etcd.CloseDefaultEtcdClient() diff --git a/pkg/cloudnet/service/handlers.go b/pkg/cloudnet/service/handlers.go index 991c8061d2..3ba24dec1c 100644 --- a/pkg/cloudnet/service/handlers.go +++ b/pkg/cloudnet/service/handlers.go @@ -35,7 +35,7 @@ import ( "yunion.io/x/onecloud/pkg/cloudnet/models" ) -func InitHandlers(app *appsrv.Application) { +func InitHandlers(app *appsrv.Application, isSlave bool) { db.InitAllManagers() db.RegisterModelManager(db.OpsLog) @@ -52,6 +52,6 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher("", app, handler) + dispatcher.AddModelDispatcher("", app, handler, isSlave) } } diff --git a/pkg/cloudnet/service/service.go b/pkg/cloudnet/service/service.go index bd3423bfeb..f1cae1cf10 100644 --- a/pkg/cloudnet/service/service.go +++ b/pkg/cloudnet/service/service.go @@ -54,7 +54,7 @@ func StartService() { cloudcommon.InitDB(dbOpts) - InitHandlers(app) + InitHandlers(app, opts.IsSlaveNode) db.EnsureAppSyncDB(app, dbOpts, models.InitDB) defer cloudcommon.CloseDB() diff --git a/pkg/cloudproxy/service/handlers.go b/pkg/cloudproxy/service/handlers.go index 0104919860..50620254e2 100644 --- a/pkg/cloudproxy/service/handlers.go +++ b/pkg/cloudproxy/service/handlers.go @@ -21,7 +21,7 @@ import ( "yunion.io/x/onecloud/pkg/cloudproxy/models" ) -func InitHandlers(app *appsrv.Application) { +func InitHandlers(app *appsrv.Application, isSlave bool) { db.InitAllManagers() db.RegisterModelManager(db.OpsLog) @@ -36,6 +36,6 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher("", app, handler) + dispatcher.AddModelDispatcher("", app, handler, isSlave) } } diff --git a/pkg/cloudproxy/service/service.go b/pkg/cloudproxy/service/service.go index 26e4383910..33b7948959 100644 --- a/pkg/cloudproxy/service/service.go +++ b/pkg/cloudproxy/service/service.go @@ -46,7 +46,7 @@ func StartService() { cloudcommon.InitDB(dbOpts) - InitHandlers(app) + InitHandlers(app, opts.IsSlaveNode) db.EnsureAppSyncDB(app, dbOpts, models.InitDB) defer cloudcommon.CloseDB() diff --git a/pkg/compute/misc/handler.go b/pkg/compute/misc/handler.go index 492701ffc0..015ef09baf 100644 --- a/pkg/compute/misc/handler.go +++ b/pkg/compute/misc/handler.go @@ -22,6 +22,7 @@ import ( "yunion.io/x/log" "yunion.io/x/pkg/tristate" + "yunion.io/x/pkg/util/httputils" "yunion.io/x/onecloud/pkg/apis/compute" "yunion.io/x/onecloud/pkg/appsrv" @@ -78,7 +79,7 @@ func getBmPrepareScript(ctx context.Context, w http.ResponseWriter, r *http.Requ httperrors.NotAcceptableError(ctx, w, "Baremetal package not prepared") return } - regionUrl, err := auth.GetPublicServiceURL("compute_v2", options.Options.Region, "") + regionUrl, err := auth.GetPublicServiceURL("compute_v2", options.Options.Region, "", httputils.POST) if err != nil { httperrors.InternalServerError(ctx, w, "%v", err) return diff --git a/pkg/compute/models/buckets.go b/pkg/compute/models/buckets.go index 4090bab5c9..7bccae70dc 100644 --- a/pkg/compute/models/buckets.go +++ b/pkg/compute/models/buckets.go @@ -30,6 +30,7 @@ import ( "yunion.io/x/log" "yunion.io/x/pkg/errors" "yunion.io/x/pkg/util/compare" + "yunion.io/x/pkg/util/httputils" "yunion.io/x/pkg/util/rbacscope" "yunion.io/x/pkg/utils" "yunion.io/x/sqlchemy" @@ -611,7 +612,7 @@ func joinPath(ep, path string) string { } func (bucket *SBucket) getMoreDetails(out api.BucketDetails) api.BucketDetails { - s3gwUrl, _ := auth.GetServiceURL("s3gateway", options.Options.Region, "", identity_apis.EndpointInterfacePublic) + s3gwUrl, _ := auth.GetServiceURL("s3gateway", options.Options.Region, "", identity_apis.EndpointInterfacePublic, httputils.POST) if len(s3gwUrl) > 0 { accessUrls := make([]cloudprovider.SBucketAccessUrl, 0) diff --git a/pkg/compute/models/hosts.go b/pkg/compute/models/hosts.go index 4a9cb6f847..96e4183de8 100644 --- a/pkg/compute/models/hosts.go +++ b/pkg/compute/models/hosts.go @@ -55,6 +55,7 @@ import ( hostapi "yunion.io/x/onecloud/pkg/apis/host" napi "yunion.io/x/onecloud/pkg/apis/notify" "yunion.io/x/onecloud/pkg/appsrv" + "yunion.io/x/onecloud/pkg/cloudcommon/consts" "yunion.io/x/onecloud/pkg/cloudcommon/db" "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman" "yunion.io/x/onecloud/pkg/cloudcommon/db/quotas" @@ -1452,7 +1453,7 @@ func (hh *SHostManager) GetPropertyK8sMasterNodeIps(ctx context.Context, userCre } func (hh *SHostManager) GetPropertyBmStartRegisterScript(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject) (jsonutils.JSONObject, error) { - regionUri, err := auth.GetPublicServiceURL("compute_v2", options.Options.Region, "") + regionUri, err := auth.GetPublicServiceURL(consts.GetServiceType(), options.Options.Region, "", httputils.POST) if err != nil { return nil, err } diff --git a/pkg/compute/service/handlers.go b/pkg/compute/service/handlers.go index 1a9e79c5e9..6d8e494322 100644 --- a/pkg/compute/service/handlers.go +++ b/pkg/compute/service/handlers.go @@ -32,20 +32,22 @@ import ( "yunion.io/x/onecloud/pkg/compute/usages" ) -func InitHandlers(app *appsrv.Application) { +func InitHandlers(app *appsrv.Application, isSlave bool) { db.InitAllManagers() db.RegistUserCredCacheUpdater() db.AddScopeResourceCountHandler("", app) - db.AddHistoryDataCleanHandler("", app) + if !isSlave { + db.AddHistoryDataCleanHandler("", app) + } - quotas.AddQuotaHandler(&models.QuotaManager.SQuotaBaseManager, "", app) - quotas.AddQuotaHandler(&models.RegionQuotaManager.SQuotaBaseManager, "", app) - quotas.AddQuotaHandler(&models.ZoneQuotaManager.SQuotaBaseManager, "", app) - quotas.AddQuotaHandler(&models.ProjectQuotaManager.SQuotaBaseManager, "", app) - quotas.AddQuotaHandler(&models.DomainQuotaManager.SQuotaBaseManager, "", app) - quotas.AddQuotaHandler(&models.InfrasQuotaManager.SQuotaBaseManager, "", app) + quotas.AddQuotaHandler(&models.QuotaManager.SQuotaBaseManager, "", app, isSlave) + quotas.AddQuotaHandler(&models.RegionQuotaManager.SQuotaBaseManager, "", app, isSlave) + quotas.AddQuotaHandler(&models.ZoneQuotaManager.SQuotaBaseManager, "", app, isSlave) + quotas.AddQuotaHandler(&models.ProjectQuotaManager.SQuotaBaseManager, "", app, isSlave) + quotas.AddQuotaHandler(&models.DomainQuotaManager.SQuotaBaseManager, "", app, isSlave) + quotas.AddQuotaHandler(&models.InfrasQuotaManager.SQuotaBaseManager, "", app, isSlave) usages.AddUsageHandler("", app) usages.AddHistoryUsageHandler("", app) @@ -53,7 +55,7 @@ func InitHandlers(app *appsrv.Application) { specs.AddSpecHandler("", app) sshkeys.AddSshKeysHandler("", app) - taskman.AddTaskHandler("", app) + taskman.AddTaskHandler("", app, isSlave) misc.AddMiscHandler("", app) @@ -261,7 +263,7 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher("", app, handler) + dispatcher.AddModelDispatcher("", app, handler, isSlave) } for _, manager := range []db.IJointModelManager{ @@ -295,6 +297,6 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewJointModelHandler(manager) - dispatcher.AddJointModelDispatcher("", app, handler) + dispatcher.AddJointModelDispatcher("", app, handler, isSlave) } } diff --git a/pkg/compute/service/service.go b/pkg/compute/service/service.go index 20b2c7b5e6..3b16bb143e 100644 --- a/pkg/compute/service/service.go +++ b/pkg/compute/service/service.go @@ -25,6 +25,7 @@ import ( "yunion.io/x/jsonutils" "yunion.io/x/log" "yunion.io/x/pkg/errors" + "yunion.io/x/pkg/util/httputils" _ "yunion.io/x/sqlchemy/backends" "yunion.io/x/onecloud/pkg/apis" @@ -85,7 +86,7 @@ func StartServiceWithJobsAndApp(jobs func(cron *cronman.SCronJobManager), appCll }) common_options.StartOptionManager(opts, opts.ConfigSyncPeriodSeconds, api.SERVICE_TYPE, api.SERVICE_VERSION, options.OnOptionsChange) - serviceUrl, err := auth.GetServiceURL(apis.SERVICE_TYPE_REGION, opts.Region, "", identity.EndpointInterfaceInternal) + serviceUrl, err := auth.GetServiceURL(apis.SERVICE_TYPE_REGION, opts.Region, "", identity.EndpointInterfaceInternal, httputils.POST) if err != nil { log.Fatalf("unable to get service url: %v", err) } @@ -115,7 +116,7 @@ func StartServiceWithJobsAndApp(jobs func(cron *cronman.SCronJobManager), appCll cloudcommon.InitDB(dbOpts) - InitHandlers(app) + InitHandlers(app, opts.IsSlaveNode) if appCllback != nil { appCllback(app) } @@ -123,10 +124,18 @@ func StartServiceWithJobsAndApp(jobs func(cron *cronman.SCronJobManager), appCll db.EnsureAppSyncDB(app, dbOpts, models.InitDB) defer cloudcommon.CloseDB() + if !opts.IsSlaveNode { + startMasterTasks(opts, dbOpts, jobs) + } + + common_app.ServeForever(app, baseOpts) +} + +func startMasterTasks(opts *options.ComputeOptions, dbOpts *common_options.DBOptions, jobs func(cron *cronman.SCronJobManager)) { setInfluxdbRetentionPolicy() - models.InitSyncWorkers(options.Options.CloudSyncWorkerCount) - cloudaccount_tasks.InitCloudproviderSyncWorkers(options.Options.CloudProviderSyncWorkerCount) + models.InitSyncWorkers(opts.CloudSyncWorkerCount) + cloudaccount_tasks.InitCloudproviderSyncWorkers(opts.CloudProviderSyncWorkerCount) var ( electObj *elect.Elect @@ -229,11 +238,8 @@ func StartServiceWithJobsAndApp(jobs func(cron *cronman.SCronJobManager), appCll go cron.Start2(ctx, electObj) } - if !opts.IsSlaveNode { - go cronFunc() - } - common_app.ServeForever(app, baseOpts) + go cronFunc() } func initDefaultEtcdClient(opts *common_options.DBOptions) error { diff --git a/pkg/devtool/service/handler.go b/pkg/devtool/service/handler.go index 88bba2afbf..4b92c8ce6f 100644 --- a/pkg/devtool/service/handler.go +++ b/pkg/devtool/service/handler.go @@ -22,10 +22,10 @@ import ( "yunion.io/x/onecloud/pkg/devtool/models" ) -func InitHandlers(app *appsrv.Application) { +func InitHandlers(app *appsrv.Application, isSlave bool) { db.InitAllManagers() - taskman.AddTaskHandler("", app) + taskman.AddTaskHandler("", app, isSlave) for _, manager := range []db.IModelManager{ taskman.TaskManager, @@ -54,6 +54,6 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher("", app, handler) + dispatcher.AddModelDispatcher("", app, handler, isSlave) } } diff --git a/pkg/devtool/service/service.go b/pkg/devtool/service/service.go index 8264f1c2ac..508b25be49 100644 --- a/pkg/devtool/service/service.go +++ b/pkg/devtool/service/service.go @@ -45,7 +45,7 @@ func StartService() { app := app_common.InitApp(&opts.BaseOptions, false) cloudcommon.InitDB(dbOpts) - InitHandlers(app) + InitHandlers(app, opts.IsSlaveNode) db.EnsureAppSyncDB(app, dbOpts, models.InitDB) defer cloudcommon.CloseDB() diff --git a/pkg/hostman/hostinfo/hostinfo.go b/pkg/hostman/hostinfo/hostinfo.go index 53392dbec0..0a4e016074 100644 --- a/pkg/hostman/hostinfo/hostinfo.go +++ b/pkg/hostman/hostinfo/hostinfo.go @@ -36,6 +36,7 @@ import ( "yunion.io/x/jsonutils" "yunion.io/x/log" "yunion.io/x/pkg/errors" + "yunion.io/x/pkg/util/httputils" "yunion.io/x/pkg/util/regutils" "yunion.io/x/pkg/util/version" "yunion.io/x/pkg/utils" @@ -2516,7 +2517,7 @@ func (h *SHostInfo) OnCatalogChanged(catalog mcclient.KeystoneServiceCatalogV3) if options.HostOptions.ManageNtpConfiguration { ntpd := system_service.GetService("ntpd") - urls, _ := s.GetServiceURLs("ntp", defaultEndpointType) + urls, _ := s.GetServiceURLs("ntp", defaultEndpointType, httputils.POST) if len(urls) > 0 { log.Infof("Get Ntp urls: %v", urls) } else { @@ -2553,7 +2554,7 @@ func (h *SHostInfo) OnCatalogChanged(catalog mcclient.KeystoneServiceCatalogV3) hostconsts.TELEGRAF_TAG_KEY_HYPERVISOR: options.HostOptions.HostType, } conf["nics"] = h.getNicsTelegrafConf() - urls, _ := s.GetServiceURLs("kafka", defaultEndpointType) + urls, _ := s.GetServiceURLs("kafka", defaultEndpointType, httputils.POST) if len(urls) > 0 { kafkaConf := map[string]interface{}{ "brokers": urls, @@ -2571,7 +2572,7 @@ func (h *SHostInfo) OnCatalogChanged(catalog mcclient.KeystoneServiceCatalogV3) conf["kafka"] = kafkaConf } - urls, _ = s.GetServiceURLs("opentsdb", defaultEndpointType) + urls, _ = s.GetServiceURLs("opentsdb", defaultEndpointType, httputils.POST) if len(urls) > 0 { conf["opentsdb"] = map[string]interface{}{ "url": urls[0], diff --git a/pkg/hostman/storageman/imagecache_local.go b/pkg/hostman/storageman/imagecache_local.go index a2be7bbd3e..f59f430d5b 100644 --- a/pkg/hostman/storageman/imagecache_local.go +++ b/pkg/hostman/storageman/imagecache_local.go @@ -199,7 +199,7 @@ func (l *SLocalImageCache) prepare(ctx context.Context, input api.CacheImageInpu l.consumerCount++ return true, nil } - url, err := auth.GetServiceURL(apis.SERVICE_TYPE_IMAGE, "", input.Zone, "") + url, err := auth.GetServiceURL(apis.SERVICE_TYPE_IMAGE, "", input.Zone, "", httputils.GET) if err != nil { return false, errors.Wrapf(err, "GetServiceURL(%s)", apis.SERVICE_TYPE_IMAGE) } diff --git a/pkg/image/service/handlers.go b/pkg/image/service/handlers.go index 494d7156ab..35b57230b6 100644 --- a/pkg/image/service/handlers.go +++ b/pkg/image/service/handlers.go @@ -30,7 +30,7 @@ const ( API_VERSION = "v1" ) -func InitHandlers(app *appsrv.Application) { +func InitHandlers(app *appsrv.Application, isSlave bool) { db.InitAllManagers() // add version handler with API_VERSION prefix @@ -40,10 +40,10 @@ func InitHandlers(app *appsrv.Application) { db.AddScopeResourceCountHandler(API_VERSION, app) - quotas.AddQuotaHandler(&models.QuotaManager.SQuotaBaseManager, API_VERSION, app) + quotas.AddQuotaHandler(&models.QuotaManager.SQuotaBaseManager, API_VERSION, app, isSlave) usages.AddUsageHandler(API_VERSION, app) - taskman.AddTaskHandler(API_VERSION, app) + taskman.AddTaskHandler(API_VERSION, app, isSlave) app_common.ExportOptionsHandler(app, &options.Options) @@ -80,6 +80,6 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher(API_VERSION, app, handler) + dispatcher.AddModelDispatcher(API_VERSION, app, handler, isSlave) } } diff --git a/pkg/image/service/service.go b/pkg/image/service/service.go index 85126937e8..fbdf6d9e72 100644 --- a/pkg/image/service/service.go +++ b/pkg/image/service/service.go @@ -104,18 +104,8 @@ func StartService() { procutils.SetRemoteExecutor() } - log.Infof("Target image formats %#v", opts.TargetImageFormats) + if !opts.IsSlaveNode { - if ok, err := hasVmwareAccount(); err != nil { - log.Errorf("failed get vmware cloudaccounts: %v", err) - } else if ok { - if !utils.IsInStringArray(string(qemuimgfmt.VMDK), options.Options.TargetImageFormats) { - if err = models.UpdateImageConfigTargetImageFormats(context.Background(), auth.AdminCredential()); err != nil { - log.Errorf("failed update target_image_formats %s", err) - } else { - options.Options.TargetImageFormats = append(options.Options.TargetImageFormats, string(qemuimgfmt.VMDK)) - } - } } trackers := torrent.GetTrackers() @@ -128,7 +118,7 @@ func StartService() { cloudcommon.InitDB(dbOpts) - InitHandlers(app) + InitHandlers(app, opts.IsSlaveNode) db.EnsureAppSyncDB(app, dbOpts, models.InitDB) @@ -139,35 +129,8 @@ func StartService() { deployclient.Init(options.Options.DeployServerSocketPath) } - go func() { - if options.Options.HasValidS3Options() { - initS3() - } - // check image after s3 mounted - models.CheckImages() - }() - if !opts.IsSlaveNode { - err := taskman.TaskManager.InitializeData() - if err != nil { - log.Fatalf("TaskManager.InitializeData fail %s", err) - } - - cachesync.StartTenantCacheSync(opts.TenantCacheExpireSeconds) - - cron := cronman.InitCronJobManager(true, options.Options.CronJobWorkerCount, options.Options.TimeZone) - cron.AddJobAtIntervals("CleanPendingDeleteImages", time.Duration(options.Options.PendingDeleteCheckSeconds)*time.Second, models.ImageManager.CleanPendingDeleteImages) - cron.AddJobAtIntervals("CalculateQuotaUsages", time.Duration(opts.CalculateQuotaUsageIntervalSeconds)*time.Second, models.QuotaManager.CalculateQuotaUsages) - cron.AddJobAtIntervals("CleanPendingDeleteGuestImages", - time.Duration(options.Options.PendingDeleteCheckSeconds)*time.Second, models.GuestImageManager.CleanPendingDeleteImages) - - cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false) - - cron.AddJobAtIntervalsWithStartRun("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalMinutes)*time.Minute, taskman.TaskManager.TaskCleanupJob, true) - - cron.AddJobAtIntervals("MarkDataImage", time.Duration(options.Options.VerifyImageStatusIntervalMinutes)*time.Minute, models.ImageManager.VerifyActiveImageStatus) - - cron.Start() + startMasterTasks(opts) } app_common.ServeForeverWithCleanup(app, baseOpts, func() { @@ -184,6 +147,51 @@ func StartService() { }) } +func startMasterTasks(opts *options.SImageOptions) { + log.Infof("Target image formats %#v", opts.TargetImageFormats) + + if ok, err := hasVmwareAccount(); err != nil { + log.Errorf("failed get vmware cloudaccounts: %v", err) + } else if ok { + if !utils.IsInStringArray(string(qemuimgfmt.VMDK), options.Options.TargetImageFormats) { + if err = models.UpdateImageConfigTargetImageFormats(context.Background(), auth.AdminCredential()); err != nil { + log.Errorf("failed update target_image_formats %s", err) + } else { + options.Options.TargetImageFormats = append(options.Options.TargetImageFormats, string(qemuimgfmt.VMDK)) + } + } + } + + go func() { + if options.Options.HasValidS3Options() { + initS3() + } + // check image after s3 mounted + models.CheckImages() + }() + + err := taskman.TaskManager.InitializeData() + if err != nil { + log.Fatalf("TaskManager.InitializeData fail %s", err) + } + + cachesync.StartTenantCacheSync(opts.TenantCacheExpireSeconds) + + cron := cronman.InitCronJobManager(true, options.Options.CronJobWorkerCount, options.Options.TimeZone) + cron.AddJobAtIntervals("CleanPendingDeleteImages", time.Duration(options.Options.PendingDeleteCheckSeconds)*time.Second, models.ImageManager.CleanPendingDeleteImages) + cron.AddJobAtIntervals("CalculateQuotaUsages", time.Duration(opts.CalculateQuotaUsageIntervalSeconds)*time.Second, models.QuotaManager.CalculateQuotaUsages) + cron.AddJobAtIntervals("CleanPendingDeleteGuestImages", + time.Duration(options.Options.PendingDeleteCheckSeconds)*time.Second, models.GuestImageManager.CleanPendingDeleteImages) + + cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false) + + cron.AddJobAtIntervalsWithStartRun("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalMinutes)*time.Minute, taskman.TaskManager.TaskCleanupJob, true) + + cron.AddJobAtIntervals("MarkDataImage", time.Duration(options.Options.VerifyImageStatusIntervalMinutes)*time.Minute, models.ImageManager.VerifyActiveImageStatus) + + cron.Start() +} + func hasVmwareAccount() (bool, error) { q := jsonutils.NewDict() q.Add(jsonutils.NewString("system"), "scope") diff --git a/pkg/image/torrent/torrent.go b/pkg/image/torrent/torrent.go index 56de07c308..73596fa592 100644 --- a/pkg/image/torrent/torrent.go +++ b/pkg/image/torrent/torrent.go @@ -20,6 +20,7 @@ import ( "time" "yunion.io/x/log" + "yunion.io/x/pkg/util/httputils" identity_apis "yunion.io/x/onecloud/pkg/apis/identity" "yunion.io/x/onecloud/pkg/appsrv" @@ -61,7 +62,7 @@ func (stat *STorrentProcessState) StopAndWait() error { } func GetTrackers() []string { - urls, err := auth.GetServiceURLs(TORRENT_TRACKER_SERVICE, options.Options.Region, "", "") + urls, err := auth.GetServiceURLs(TORRENT_TRACKER_SERVICE, options.Options.Region, "", "", httputils.POST) if err != nil { log.Errorf("fail to get torrent-tracker") return nil @@ -98,7 +99,7 @@ func SeedTorrent(torrentpath string, imageId, format string) error { } func seedTorrent(torrentpath string, imageId, format string) error { - url, err := auth.GetServiceURL("image", options.Options.Region, "", identity_apis.EndpointInterfacePublic) + url, err := auth.GetServiceURL("image", options.Options.Region, "", identity_apis.EndpointInterfacePublic, httputils.POST) if err != nil { return err } diff --git a/pkg/keystone/models/endpoints.go b/pkg/keystone/models/endpoints.go index 8805aa6c7a..e2bbb98cef 100644 --- a/pkg/keystone/models/endpoints.go +++ b/pkg/keystone/models/endpoints.go @@ -269,15 +269,15 @@ func (ep *SEndpoint) getService() *SService { } type SEndpointExtended struct { - Id string - Name string - Interface string - Url string - Region string - RegionId string - ServiceId string - ServiceType string - ServiceName string + Id string `json:"id"` + Name string `json:"name"` + Interface string `json:"interface"` + Url string `json:"url"` + Region string `json:"region"` + RegionId string `json:"region_id"` + ServiceId string `json:"service_id"` + ServiceType string `json:"service_type"` + ServiceName string `json:"service_name"` } type SServiceCatalog []SEndpointExtended @@ -493,7 +493,8 @@ func (manager *SEndpointManager) ValidateCreateData( } service := servObj.(*SService) if !data.Contains("name") { - data.Set("name", jsonutils.NewString(fmt.Sprintf("%s-%s", service.Type, infname))) + nameStr := fmt.Sprintf("%s-%s", service.Type, infname) + data.Set("name", jsonutils.NewString(nameStr)) } data.Set("service_id", jsonutils.NewString(service.Id)) } else { @@ -550,10 +551,7 @@ func (manager *SEndpointManager) ListItemFilter( } } if len(query.Interface) > 0 { - infType := query.Interface - if strings.HasSuffix(infType, "URL") { - infType = infType[0 : len(infType)-3] - } + infType := strings.TrimSuffix(query.Interface, "URL") q = q.Equals("interface", infType) } return q, nil diff --git a/pkg/keystone/service/handlers.go b/pkg/keystone/service/handlers.go index 37ab549bda..378d3c1270 100644 --- a/pkg/keystone/service/handlers.go +++ b/pkg/keystone/service/handlers.go @@ -32,18 +32,18 @@ const ( API_VERSION = "v3" ) -func InitHandlers(app *appsrv.Application) { +func InitHandlers(app *appsrv.Application, isSlave bool) { db.InitAllManagers() // add version handler with API_VERSION prefix app.AddDefaultHandler("GET", API_VERSION+"/version", appsrv.VersionHandler, "version") cronjobs.AddRefreshHandler(API_VERSION, app) - quotas.AddQuotaHandler(&models.IdentityQuotaManager.SQuotaBaseManager, API_VERSION, app) + quotas.AddQuotaHandler(&models.IdentityQuotaManager.SQuotaBaseManager, API_VERSION, app, isSlave) usages.AddUsageHandler(API_VERSION, app) - taskman.AddTaskHandler(API_VERSION, app) + taskman.AddTaskHandler(API_VERSION, app, isSlave) app_common.ExportOptionsHandlerWithPrefix(app, API_VERSION, &options.Options) @@ -108,7 +108,7 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher(API_VERSION, app, handler) + dispatcher.AddModelDispatcher(API_VERSION, app, handler, isSlave) } models.AddAdhocHandlers(API_VERSION, app) diff --git a/pkg/keystone/service/service.go b/pkg/keystone/service/service.go index ef37314a66..dcb079c218 100644 --- a/pkg/keystone/service/service.go +++ b/pkg/keystone/service/service.go @@ -86,7 +86,7 @@ func StartService() { cloudcommon.InitDB(&opts.DBOptions) - InitHandlers(app) + InitHandlers(app, opts.IsSlaveNode) db.EnsureAppSyncDB(app, &opts.DBOptions, models.InitDB) @@ -94,7 +94,7 @@ func StartService() { common_options.StartOptionManagerWithSessionDriver(opts, opts.ConfigSyncPeriodSeconds, api.SERVICE_TYPE, "", options.OnOptionsChange, models.NewServiceConfigSession()) - { + if !opts.IsSlaveNode { err := models.UserManager.EnforceUserMfa(context.Background()) if err != nil { log.Errorf("EnforceUserMfa fail %s", err) diff --git a/pkg/llm/service/handler.go b/pkg/llm/service/handler.go index 665843c1e8..e16c3d1c63 100644 --- a/pkg/llm/service/handler.go +++ b/pkg/llm/service/handler.go @@ -8,11 +8,11 @@ import ( "yunion.io/x/onecloud/pkg/llm/models" ) -func InitHandlers(app *appsrv.Application) { +func InitHandlers(app *appsrv.Application, isSlave bool) { db.InitAllManagers() db.RegistUserCredCacheUpdater() - taskman.AddTaskHandler("", app) + taskman.AddTaskHandler("", app, isSlave) for _, manager := range []db.IModelManager{ taskman.TaskManager, @@ -44,6 +44,6 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher("", app, handler) + dispatcher.AddModelDispatcher("", app, handler, isSlave) } } diff --git a/pkg/llm/service/service.go b/pkg/llm/service/service.go index 0d59d4ec18..69303c3b72 100644 --- a/pkg/llm/service/service.go +++ b/pkg/llm/service/service.go @@ -34,7 +34,7 @@ func StartService() { app := app_common.InitApp(&opts.BaseOptions, false) cloudcommon.InitDB(dbOpts) - InitHandlers(app) + InitHandlers(app, opts.IsSlaveNode) db.EnsureAppSyncDB(app, dbOpts, models.InitDB) defer cloudcommon.CloseDB() diff --git a/pkg/logger/service/handlers.go b/pkg/logger/service/handlers.go index bd08b459f5..0d3ea34e44 100644 --- a/pkg/logger/service/handlers.go +++ b/pkg/logger/service/handlers.go @@ -22,7 +22,7 @@ import ( "yunion.io/x/onecloud/pkg/logger/options" ) -func initHandlers(app *appsrv.Application) { +func InitHandlers(app *appsrv.Application, isSlave bool) { db.InitAllManagers() models.InitActionLog() @@ -48,6 +48,6 @@ func initHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher("", app, handler) + dispatcher.AddModelDispatcher("", app, handler, isSlave) } } diff --git a/pkg/logger/service/service.go b/pkg/logger/service/service.go index 5519e151ff..71d3123430 100644 --- a/pkg/logger/service/service.go +++ b/pkg/logger/service/service.go @@ -53,7 +53,7 @@ func StartService() { cloudcommon.InitDB(dbOpts) - initHandlers(app) + InitHandlers(app, opts.IsSlaveNode) db.EnsureAppSyncDB(app, dbOpts, models.InitDB) defer cloudcommon.CloseDB() diff --git a/pkg/mcclient/auth/auth.go b/pkg/mcclient/auth/auth.go index 8c52e54a2b..47fc18ffa6 100644 --- a/pkg/mcclient/auth/auth.go +++ b/pkg/mcclient/auth/auth.go @@ -298,16 +298,16 @@ func (a *authManager) reAuth() { a.SyncOnce(false, false) } -func (a *authManager) GetServiceURL(service, region, zone, endpointType string) (string, error) { - return a.getAdminSession(context.Background(), region, zone, endpointType).GetServiceURL(service, endpointType) +func (a *authManager) GetServiceURL(service, region, zone, endpointType string, method httputils.THttpMethod) (string, error) { + return a.getAdminSession(context.Background(), region, zone, endpointType).GetServiceURL(service, endpointType, method) } -func (a *authManager) GetServiceURLs(service, region, zone, endpointType string) ([]string, error) { - return a.getAdminSession(context.Background(), region, zone, endpointType).GetServiceURLs(service, endpointType) +func (a *authManager) GetServiceURLs(service, region, zone, endpointType string, method httputils.THttpMethod) ([]string, error) { + return a.getAdminSession(context.Background(), region, zone, endpointType).GetServiceURLs(service, endpointType, method) } func (a *authManager) getServiceIPs(service, region, zone, endpointType string, needResolve bool) ([]string, error) { - urls, err := a.GetServiceURLs(service, region, zone, endpointType) + urls, err := a.GetServiceURLs(service, region, zone, endpointType, httputils.POST) if err != nil { return nil, errors.Wrap(err, "GetServiceURLs") } @@ -384,16 +384,16 @@ func VerifyRequest(req http.Request, virtualHost bool) (mcclient.TokenCredential return manager.verifyRequest(req, virtualHost) } -func GetServiceURL(service, region, zone, endpointType string) (string, error) { - return manager.GetServiceURL(service, region, zone, endpointType) +func GetServiceURL(service, region, zone, endpointType string, method httputils.THttpMethod) (string, error) { + return manager.GetServiceURL(service, region, zone, endpointType, method) } -func GetPublicServiceURL(service, region, zone string) (string, error) { - return manager.GetServiceURL(service, region, zone, identity.EndpointInterfacePublic) +func GetPublicServiceURL(service, region, zone string, method httputils.THttpMethod) (string, error) { + return manager.GetServiceURL(service, region, zone, identity.EndpointInterfacePublic, method) } -func GetServiceURLs(service, region, zone, endpointType string) ([]string, error) { - return manager.GetServiceURLs(service, region, zone, endpointType) +func GetServiceURLs(service, region, zone, endpointType string, method httputils.THttpMethod) ([]string, error) { + return manager.GetServiceURLs(service, region, zone, endpointType, method) } func GetDNSServers(region, zone string) ([]string, error) { diff --git a/pkg/mcclient/modulebase/base.go b/pkg/mcclient/modulebase/base.go index a763761570..af42febe67 100644 --- a/pkg/mcclient/modulebase/base.go +++ b/pkg/mcclient/modulebase/base.go @@ -128,9 +128,9 @@ func (this *BaseManager) rawRequest(session *mcclient.ClientSession, header, body) } -func (this *BaseManager) GetBaseUrl(s *mcclient.ClientSession) (string, error) { +/*func (this *BaseManager) GetBaseUrl(s *mcclient.ClientSession) (string, error) { return s.GetBaseUrl(this.serviceType, this.endpointType) -} +}*/ func (this *BaseManager) rawBaseUrlRequest(s *mcclient.ClientSession, method httputils.THttpMethod, path string, diff --git a/pkg/mcclient/modulebase/modules.go b/pkg/mcclient/modulebase/modules.go index eb83b8cbf1..2e72018c51 100644 --- a/pkg/mcclient/modulebase/modules.go +++ b/pkg/mcclient/modulebase/modules.go @@ -23,6 +23,7 @@ import ( "yunion.io/x/jsonutils" "yunion.io/x/log" "yunion.io/x/pkg/errors" + "yunion.io/x/pkg/util/httputils" "yunion.io/x/pkg/util/printutils" "yunion.io/x/pkg/util/sets" @@ -301,7 +302,7 @@ func _getModule(session *mcclient.ClientSession, name string) (IBaseManager, err } for _, mod := range mods { - url, e := session.GetServiceURL(mod.ServiceType(), mod.EndpointType()) + url, e := session.GetServiceURL(mod.ServiceType(), mod.EndpointType(), httputils.POST) if e != nil { return nil, errors.Wrap(e, "session.GetServiceURL") } @@ -352,7 +353,7 @@ func GetJointModule2(session *mcclient.ClientSession, mod1 Manager, mod2 Manager continue } for _, mod := range mods { - url, e := session.GetServiceVersionURL(mod.ServiceType(), mod.EndpointType()) + url, e := session.GetServiceVersionURL(mod.ServiceType(), mod.EndpointType(), httputils.POST) if e != nil { return nil, e } diff --git a/pkg/mcclient/modules/compute/mod_metadatas.go b/pkg/mcclient/modules/compute/mod_metadatas.go index 277dd3cb1b..b3c8bef68d 100644 --- a/pkg/mcclient/modules/compute/mod_metadatas.go +++ b/pkg/mcclient/modules/compute/mod_metadatas.go @@ -18,6 +18,7 @@ import ( "fmt" "yunion.io/x/jsonutils" + "yunion.io/x/pkg/util/httputils" "yunion.io/x/pkg/util/printutils" "yunion.io/x/pkg/utils" @@ -96,7 +97,7 @@ func (this *MetadataManager) getModule(session *mcclient.ClientSession, params j } } - _, err := session.GetServiceURL(service, "") + _, err := session.GetServiceURL(service, "", httputils.POST) if err != nil { return nil, httperrors.NewNotFoundError("service %s not found error: %v", service, err) } diff --git a/pkg/mcclient/modules/compute/mod_skus.go b/pkg/mcclient/modules/compute/mod_skus.go index acc71c0f40..f315f486f4 100644 --- a/pkg/mcclient/modules/compute/mod_skus.go +++ b/pkg/mcclient/modules/compute/mod_skus.go @@ -94,7 +94,7 @@ func (self *SkusManager) GetSkus(s *mcclient.ClientSession, providerId, regionId } func (self *OfflineCloudmetaManager) GetSkuSourcesMeta(s *mcclient.ClientSession, client *http.Client) (jsonutils.JSONObject, error) { - baseUrl, err := s.GetServiceVersionURL(self.ServiceType(), self.EndpointType()) + baseUrl, err := s.GetServiceVersionURL(self.ServiceType(), self.EndpointType(), httputils.GET) if err != nil { return nil, err } diff --git a/pkg/mcclient/modules/webconsole/mod_webconsole.go b/pkg/mcclient/modules/webconsole/mod_webconsole.go index 90c72d1098..0d58852fcc 100644 --- a/pkg/mcclient/modules/webconsole/mod_webconsole.go +++ b/pkg/mcclient/modules/webconsole/mod_webconsole.go @@ -20,6 +20,7 @@ import ( "yunion.io/x/jsonutils" "yunion.io/x/pkg/errors" + "yunion.io/x/pkg/util/httputils" compute_api "yunion.io/x/onecloud/pkg/apis/compute" webconsole_api "yunion.io/x/onecloud/pkg/apis/webconsole" @@ -198,7 +199,7 @@ func (m WebConsoleManager) doActionWithClimcPod( func (m WebConsoleManager) doCloudShell(s *mcclient.ClientSession, info *webconsole_api.SK8sShellDisplayInfo, cmd string, args ...string) (jsonutils.JSONObject, error) { endpointType := "internal" - authUrl, err := s.GetServiceURL("identity", endpointType) + authUrl, err := s.GetServiceURL("identity", endpointType, httputils.POST) if err != nil { return nil, httperrors.NewNotFoundError("auth_url not found") } diff --git a/pkg/mcclient/session.go b/pkg/mcclient/session.go index f19819acf1..280b546373 100644 --- a/pkg/mcclient/session.go +++ b/pkg/mcclient/session.go @@ -129,8 +129,8 @@ func (cliss *ClientSession) GetServiceName(service string) string { return service } -func (cliss *ClientSession) GetServiceURL(service, endpointType string) (string, error) { - return cliss.GetServiceVersionURL(service, endpointType) +func (cliss *ClientSession) GetServiceURL(service, endpointType string, method httputils.THttpMethod) (string, error) { + return cliss.GetServiceVersionURL(service, endpointType, method) } func (cliss *ClientSession) SetServiceCatalog(catalog IServiceCatalog) { @@ -144,33 +144,43 @@ func (cliss *ClientSession) GetServiceCatalog() IServiceCatalog { return cliss.client.GetServiceCatalog() } -func (cliss *ClientSession) GetServiceVersionURL(service, endpointType string) (string, error) { - urls, err := cliss.GetServiceVersionURLs(service, endpointType) +func (cliss *ClientSession) GetServiceVersionURL(service, endpointType string, method httputils.THttpMethod) (string, error) { + urls, err := cliss.GetServiceVersionURLs(service, endpointType, method) if err != nil { return "", errors.Wrap(err, "GetServiceVersionURLs") } return urls[rand.Intn(len(urls))], nil } -func (cliss *ClientSession) GetServiceURLs(service, endpointType string) ([]string, error) { - return cliss.GetServiceVersionURLs(service, endpointType) +func (cliss *ClientSession) GetServiceURLs(service, endpointType string, method httputils.THttpMethod) ([]string, error) { + return cliss.GetServiceVersionURLs(service, endpointType, method) } -func (cliss *ClientSession) GetServiceVersionURLs(service, endpointType string) ([]string, error) { +func (cliss *ClientSession) GetServiceVersionURLs(service, endpointType string, method httputils.THttpMethod) ([]string, error) { + return cliss.GetServiceVersionURLsByMethod(service, endpointType, method) +} + +func (cliss *ClientSession) GetServiceVersionURLsByMethod(service, endpointType string, method httputils.THttpMethod) ([]string, error) { if len(cliss.endpointType) > 0 { // session specific endpoint type should override the input endpointType, which is supplied by manager endpointType = cliss.endpointType } service = cliss.GetServiceName(service) if endpointType == api.EndpointInterfaceApigateway { - return cliss.getApigatewayServiceURLs(service, cliss.region, cliss.zone, endpointType) + return cliss.getApigatewayServiceURLs(service, cliss.region, cliss.zone) } else { + if (endpointType == "" || endpointType == api.EndpointInterfaceInternal) && (method == httputils.GET || method == httputils.HEAD) { + urls, _ := cliss.getServiceVersionURLs(service, cliss.region, cliss.zone, api.EndpointInterfaceSlave) + if len(urls) > 0 { + return urls, nil + } + } return cliss.getServiceVersionURLs(service, cliss.region, cliss.zone, endpointType) } } -func (cliss *ClientSession) getApigatewayServiceURLs(service, region, zone, endpointType string) ([]string, error) { - urls, err := cliss.getServiceVersionURLs(service, region, zone, "") +func (cliss *ClientSession) getApigatewayServiceURLs(service, region, zone string) ([]string, error) { + urls, err := cliss.getServiceVersionURLs(service, region, zone, api.EndpointInterfaceInternal) if err != nil { return nil, errors.Wrap(err, "getServiceVersionURLs") } @@ -231,14 +241,14 @@ func (cliss *ClientSession) getServiceVersionURLs(service, region, zone, endpoin return urls, err } -func (cliss *ClientSession) GetBaseUrl(service, endpointType string) (string, error) { +func (cliss *ClientSession) GetBaseUrl(service, endpointType string, method httputils.THttpMethod) (string, error) { if len(service) > 0 { if strings.HasPrefix(service, "http://") || strings.HasPrefix(service, "https://") { return service, nil } else if url, ok := cliss.customizeServiceUrl[service]; ok { return url, nil } else { - return cliss.GetServiceVersionURL(service, endpointType) + return cliss.GetServiceVersionURL(service, endpointType, method) } } else { return "", fmt.Errorf("Empty service type or baseURL") @@ -251,7 +261,7 @@ func (cliss *ClientSession) RawBaseUrlRequest( headers http.Header, body io.Reader, baseurlFactory func(string) string, ) (*http.Response, error) { - baseurl, err := cliss.GetBaseUrl(service, endpointType) + baseurl, err := cliss.GetBaseUrl(service, endpointType, method) if err != nil { return nil, err } @@ -288,7 +298,7 @@ func (cliss *ClientSession) JSONVersionRequest( service, endpointType string, method httputils.THttpMethod, url string, headers http.Header, body jsonutils.JSONObject, ) (http.Header, jsonutils.JSONObject, error) { - baseUrl, err := cliss.GetBaseUrl(service, endpointType) + baseUrl, err := cliss.GetBaseUrl(service, endpointType, method) if err != nil { return headers, nil, err } @@ -372,7 +382,7 @@ func (cliss *ClientSession) SetServiceUrl(service, url string) { } func (cliss *ClientSession) WithTaskCallback(taskId string, req func() error) error { - baseUrl, err := cliss.GetBaseUrl(consts.GetServiceType(), api.EndpointInterfacePublic) + baseUrl, err := cliss.GetBaseUrl(consts.GetServiceType(), api.EndpointInterfacePublic, httputils.POST) if err != nil { log.Errorf("GetServiceURLs error: %s", err) return errors.Wrap(err, "GetServiceURLs") diff --git a/pkg/mcclient/token.go b/pkg/mcclient/token.go index 77ffef0a44..5500c79a6f 100644 --- a/pkg/mcclient/token.go +++ b/pkg/mcclient/token.go @@ -23,19 +23,19 @@ import ( ) type ExternalService struct { - Name string - Url string + Name string `json:"name"` + Url string `json:"url"` - Service string + Service string `json:"service"` } type Endpoint struct { - Id string - RegionId string - ServiceId string - ServiceName string - Url string - Interface string + Id string `json:"id"` + RegionId string `json:"region_id"` + ServiceId string `json:"service_id"` + ServiceName string `json:"service_name"` + Url string `json:"url"` + Interface string `json:"interface"` } func OwnerIdString(owner IIdentityProvider, scope rbacscope.TRbacScope) string { diff --git a/pkg/mcclient/token3.go b/pkg/mcclient/token3.go index aeca41957a..71273655e9 100644 --- a/pkg/mcclient/token3.go +++ b/pkg/mcclient/token3.go @@ -42,6 +42,7 @@ type KeystoneEndpointV3 struct { // | public | 外部接口 | // | admin | 管理类型接口,deprecated | // | console | web控制台接口,指定显示在web控制台的外部服务的接口地址 | + // | slave | 从节点接口,用于读取数据 | // Interface string `json:"interface"` // 区域名称 @@ -360,12 +361,12 @@ func (catalog KeystoneServiceCatalogV3) getEndpoints(region string, endpointType endpoint := catalog[i].Endpoints[j] if (endpoint.RegionId == region || strings.HasPrefix(endpoint.RegionId, region+"-")) && endpoint.Interface == endpointType { endpoints = append(endpoints, Endpoint{ - endpoint.Id, - endpoint.RegionId, - catalog[i].Id, - catalog[i].Name, - endpoint.Url, - endpoint.Interface, + Id: endpoint.Id, + RegionId: endpoint.RegionId, + ServiceId: catalog[i].Id, + ServiceName: catalog[i].Name, + Url: endpoint.Url, + Interface: endpoint.Interface, }) } } @@ -405,14 +406,14 @@ func (catalog KeystoneServiceCatalogV3) getServiceURL(service, region, zone, end func (catalog KeystoneServiceCatalogV3) getServiceURLs(service, region, zone, endpointType string) ([]string, error) { if endpointType == "" { - endpointType = "internalURL" + endpointType = "internal" } + const MAX_ENDPOINTS = 4 for i := 0; i < len(catalog); i++ { if service == catalog[i].Type { if len(catalog[i].Endpoints) == 0 { continue } - var selected []string regeps := make(map[string][]string) regionzone := "" if len(zone) > 0 { @@ -426,32 +427,33 @@ func (catalog KeystoneServiceCatalogV3) getServiceURLs(service, region, zone, en len(region) == 0) { _, ok := regeps[ep.RegionId] if !ok { - regeps[ep.RegionId] = make([]string, 0) + regeps[ep.RegionId] = make([]string, 0, MAX_ENDPOINTS) } regeps[ep.RegionId] = append(regeps[ep.RegionId], ep.Url) } } if len(region) == 0 { if len(regeps) >= 1 { - for _, v := range regeps { - selected = v - break + for k := range regeps { + return regeps[k], nil } } else { return nil, fmt.Errorf("No default region for region(%s) zone(%s)", region, zone) } } else { + var selected []string _, ok := regeps[regionzone] if ok { selected = regeps[regionzone] + } else if _, ok := regeps[region]; ok { + selected = regeps[region] + } + if len(selected) == 0 { + return nil, fmt.Errorf("No valid %s endpoints for %s in region %s", endpointType, service, RegionID(region, zone)) } else { - selected, ok = regeps[region] - if !ok { - return nil, fmt.Errorf("No valid %s endpoints for %s in region %s", endpointType, service, RegionID(region, zone)) - } + return selected, nil } } - return selected, nil } } return nil, errors.Wrapf(httperrors.ErrNotFound, "No such service %s", service) diff --git a/pkg/monitor/datasource/datasource.go b/pkg/monitor/datasource/datasource.go index c2f4908852..7baf66aeb4 100644 --- a/pkg/monitor/datasource/datasource.go +++ b/pkg/monitor/datasource/datasource.go @@ -22,6 +22,7 @@ import ( "yunion.io/x/log" "yunion.io/x/pkg/errors" + "yunion.io/x/pkg/util/httputils" "yunion.io/x/pkg/util/stringutils" "yunion.io/x/pkg/util/wait" @@ -153,7 +154,7 @@ func (man *dataSourceManager) initDefaultDataSource(ctx context.Context) error { if err := tsdb.IsValidDataSource(dsSvc); err != nil { return errors.Wrapf(err, "invalid type %q", dsSvc) } - url, err := s.GetServiceURL(dsSvc, epType) + url, err := s.GetServiceURL(dsSvc, epType, httputils.POST) if err != nil { return errors.Errorf("get %q public url: %v", dsSvc, err) } diff --git a/pkg/monitor/service/handlers.go b/pkg/monitor/service/handlers.go index 90d7f2b356..6550ffec3d 100644 --- a/pkg/monitor/service/handlers.go +++ b/pkg/monitor/service/handlers.go @@ -33,7 +33,7 @@ import ( "yunion.io/x/onecloud/pkg/monitor/options" ) -func InitHandlers(app *appsrv.Application) { +func InitHandlers(app *appsrv.Application, isSlave bool) { db.InitAllManagers() db.RegisterModelManager(db.TenantCacheManager) @@ -41,7 +41,7 @@ func InitHandlers(app *appsrv.Application) { db.RegisterModelManager(db.RoleCacheManager) db.RegistUserCredCacheUpdater() - taskman.AddTaskHandler("", app) + taskman.AddTaskHandler("", app, isSlave) for _, manager := range []db.IModelManager{ taskman.TaskManager, @@ -73,14 +73,14 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher("", app, handler) + dispatcher.AddModelDispatcher("", app, handler, isSlave) } for _, manager := range []db.IModelManager{ models.UnifiedMonitorManager, } { handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher("", app, handler) + dispatcher.AddModelDispatcher("", app, handler, isSlave) } for _, manager := range []db.IJointModelManager{ @@ -92,7 +92,7 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewJointModelHandler(manager) - dispatcher.AddJointModelDispatcher("", app, handler) + dispatcher.AddJointModelDispatcher("", app, handler, isSlave) } } diff --git a/pkg/monitor/service/service.go b/pkg/monitor/service/service.go index dd97e6f285..ae6870eecc 100644 --- a/pkg/monitor/service/service.go +++ b/pkg/monitor/service/service.go @@ -70,14 +70,14 @@ func StartService() { cloudcommon.InitDB(dbOpts) - InitHandlers(app) + InitHandlers(app, opts.IsSlaveNode) db.EnsureAppSyncDB(app, dbOpts, models.InitDB) defer cloudcommon.CloseDB() - go startServices() - if !opts.IsSlaveNode { + go startServices() + err := taskman.TaskManager.InitializeData() if err != nil { log.Fatalf("TaskManager.InitializeData fail %s", err) diff --git a/pkg/notify/service/handlers.go b/pkg/notify/service/handlers.go index 1e66c8538a..f73c5ecbc8 100644 --- a/pkg/notify/service/handlers.go +++ b/pkg/notify/service/handlers.go @@ -27,7 +27,7 @@ const ( API_VERSION = "v2" ) -func InitHandlers(app *appsrv.Application) { +func InitHandlers(app *appsrv.Application, isSlave bool) { db.InitAllManagers() models.InitEventLog() @@ -35,7 +35,7 @@ func InitHandlers(app *appsrv.Application) { db.RegistUserCredCacheUpdater() - taskman.AddTaskHandler(API_VERSION, app) + taskman.AddTaskHandler(API_VERSION, app, isSlave) db.AddScopeResourceCountHandler(API_VERSION, app) @@ -74,7 +74,7 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher(API_VERSION, app, handler) + dispatcher.AddModelDispatcher(API_VERSION, app, handler, isSlave) } for _, manager := range []db.IJointModelManager{ models.SubscriberReceiverManager, @@ -86,6 +86,6 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewJointModelHandler(manager) - dispatcher.AddJointModelDispatcher(API_VERSION, app, handler) + dispatcher.AddJointModelDispatcher(API_VERSION, app, handler, isSlave) } } diff --git a/pkg/notify/service/service.go b/pkg/notify/service/service.go index 982679971b..8a15ca60e1 100644 --- a/pkg/notify/service/service.go +++ b/pkg/notify/service/service.go @@ -57,20 +57,20 @@ func StartService() { cloudcommon.InitDB(dbOpts) - InitHandlers(applicaion) + InitHandlers(applicaion, opts.IsSlaveNode) // init database db.EnsureAppSyncDB(applicaion, dbOpts, models.InitDB) defer cloudcommon.CloseDB() - if options.Options.EnableWatchUser { - err := models.ReceiverManager.StartWatchUserInKeystone() - if err != nil { - log.Errorln(errors.Wrap(err, "StartWatchUserInKeystone")) - } - } - if !opts.IsSlaveNode { + if options.Options.EnableWatchUser { + err := models.ReceiverManager.StartWatchUserInKeystone() + if err != nil { + log.Errorln(errors.Wrap(err, "StartWatchUserInKeystone")) + } + } + err := taskman.TaskManager.InitializeData() if err != nil { log.Fatalf("TaskManager.InitializeData fail %s", err) diff --git a/pkg/scheduledtask/service/handlers.go b/pkg/scheduledtask/service/handlers.go index b5d7f898d1..087755a4a1 100644 --- a/pkg/scheduledtask/service/handlers.go +++ b/pkg/scheduledtask/service/handlers.go @@ -21,7 +21,7 @@ import ( "yunion.io/x/onecloud/pkg/scheduledtask/models" ) -func InitHandlers(app *appsrv.Application) { +func InitHandlers(app *appsrv.Application, isSlave bool) { db.InitAllManagers() db.RegistUserCredCacheUpdater() db.AddScopeResourceCountHandler("", app) @@ -42,6 +42,6 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher("", app, handler) + dispatcher.AddModelDispatcher("", app, handler, isSlave) } } diff --git a/pkg/scheduledtask/service/service.go b/pkg/scheduledtask/service/service.go index a43929e388..66b7248d00 100644 --- a/pkg/scheduledtask/service/service.go +++ b/pkg/scheduledtask/service/service.go @@ -46,15 +46,18 @@ func StartService() { cloudcommon.InitDB(dbOpts) - InitHandlers(applicaion) + InitHandlers(applicaion, opts.IsSlaveNode) db.EnsureAppSyncDB(applicaion, dbOpts, nil) defer cloudcommon.CloseDB() - cron := cronman.InitCronJobManager(true, 4, opts.TimeZone) - cron.AddJobAtIntervalsWithStartRun("ScheduledTaskCheck", time.Duration(60)*time.Second, models.ScheduledTaskManager.Timer, true) - cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false) + if !opts.IsSlaveNode { + cron := cronman.InitCronJobManager(true, 4, opts.TimeZone) + cron.AddJobAtIntervalsWithStartRun("ScheduledTaskCheck", time.Duration(60)*time.Second, models.ScheduledTaskManager.Timer, true) + cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false) + + go cron.Start() + } - go cron.Start() app.ServeForever(applicaion, baseOpts) } diff --git a/pkg/webconsole/service/handlers.go b/pkg/webconsole/service/handlers.go index 7283be505f..2a54c39dfb 100644 --- a/pkg/webconsole/service/handlers.go +++ b/pkg/webconsole/service/handlers.go @@ -59,25 +59,28 @@ const ( WebsocketProxyPathPrefix = "/wsproxy/" ) -func initHandlers(app *appsrv.Application) { - app.AddHandler("POST", ApiPathPrefix+"k8s//shell", auth.Authenticate(handleK8sShell)) - app.AddHandler("POST", ApiPathPrefix+"climc/shell", auth.Authenticate(handleClimcShell)) - app.AddHandler("POST", ApiPathPrefix+"k8s//log", auth.Authenticate(handleK8sLog)) - app.AddHandler("POST", ApiPathPrefix+"baremetal/", auth.Authenticate(handleBaremetalShell)) - app.AddHandler("POST", ApiPathPrefix+"ssh/", auth.Authenticate(handleSshShell)) - app.AddHandler("POST", ApiPathPrefix+"server/", auth.Authenticate(handleServerRemoteConsole)) - app.AddHandler("POST", ApiPathPrefix+"adb//shell", auth.Authenticate(handleAdbShell)) - app.AddHandler("POST", ApiPathPrefix+"server-rdp/", auth.Authenticate(handleServerRemoteRDPConsole)) +func initHandlers(app *appsrv.Application, isSlave bool) { app.AddHandler("GET", ApiPathPrefix+"sftp//list", server.HandleSftpList) app.AddHandler("GET", ApiPathPrefix+"sftp//download", server.HandleSftpDownload) - app.AddHandler("POST", ApiPathPrefix+"sftp//upload", server.HandleSftpUpload) + + if !isSlave { + app.AddHandler("POST", ApiPathPrefix+"k8s//shell", auth.Authenticate(handleK8sShell)) + app.AddHandler("POST", ApiPathPrefix+"climc/shell", auth.Authenticate(handleClimcShell)) + app.AddHandler("POST", ApiPathPrefix+"k8s//log", auth.Authenticate(handleK8sLog)) + app.AddHandler("POST", ApiPathPrefix+"baremetal/", auth.Authenticate(handleBaremetalShell)) + app.AddHandler("POST", ApiPathPrefix+"ssh/", auth.Authenticate(handleSshShell)) + app.AddHandler("POST", ApiPathPrefix+"server/", auth.Authenticate(handleServerRemoteConsole)) + app.AddHandler("POST", ApiPathPrefix+"adb//shell", auth.Authenticate(handleAdbShell)) + app.AddHandler("POST", ApiPathPrefix+"server-rdp/", auth.Authenticate(handleServerRemoteRDPConsole)) + app.AddHandler("POST", ApiPathPrefix+"sftp//upload", server.HandleSftpUpload) + } for _, man := range []db.IModelManager{ models.GetCommandLogManager(), } { db.RegisterModelManager(man) handler := db.NewModelHandler(man) - dispatcher.AddModelDispatcher(ApiPathPrefix, app, handler) + dispatcher.AddModelDispatcher(ApiPathPrefix, app, handler, isSlave) } } diff --git a/pkg/webconsole/service/service.go b/pkg/webconsole/service/service.go index 5a7bff6810..3b930fdd46 100644 --- a/pkg/webconsole/service/service.go +++ b/pkg/webconsole/service/service.go @@ -77,7 +77,7 @@ func start() { cloudcommon.InitDB(dbOpts) - initHandlers(app) + initHandlers(app, baseOpts.IsSlaveNode) db.EnsureAppSyncDB(app, dbOpts, models.InitDB) @@ -100,12 +100,14 @@ func start() { // misc handler appsrv.AddMiscHandlersToMuxRouter(app, root, o.Options.EnableAppProfiling) - cron := cronman.InitCronJobManager(true, o.Options.CronJobWorkerCount, o.Options.TimeZone) + if !baseOpts.IsSlaveNode { + cron := cronman.InitCronJobManager(true, o.Options.CronJobWorkerCount, o.Options.TimeZone) - cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false) + cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false) - cron.Start() - defer cron.Stop() + cron.Start() + defer cron.Stop() + } addr := net.JoinHostPort(o.Options.Address, strconv.Itoa(o.Options.Port)) log.Infof("Start listen on %s", addr) diff --git a/pkg/yunionconf/service/handlers.go b/pkg/yunionconf/service/handlers.go index dc11c62d72..826800ca68 100644 --- a/pkg/yunionconf/service/handlers.go +++ b/pkg/yunionconf/service/handlers.go @@ -32,11 +32,11 @@ import ( "yunion.io/x/onecloud/pkg/yunionconf/models" ) -func InitHandlers(app *appsrv.Application) { +func InitHandlers(app *appsrv.Application, isSlave bool) { db.InitAllManagers() db.AddScopeResourceCountHandler("", app) - addBugReportHandler("", app) + addBugReportHandler("", app, isSlave) for _, manager := range []db.IModelManager{ db.UserCacheManager, @@ -55,16 +55,19 @@ func InitHandlers(app *appsrv.Application) { } { db.RegisterModelManager(manager) handler := db.NewModelHandler(manager) - dispatcher.AddModelDispatcher("", app, handler) + dispatcher.AddModelDispatcher("", app, handler, isSlave) if manager == models.ParameterManager { - dispatcher.AddModelDispatcher("/users/", app, handler) - dispatcher.AddModelDispatcher("/services/", app, handler) + dispatcher.AddModelDispatcher("/users/", app, handler, isSlave) + dispatcher.AddModelDispatcher("/services/", app, handler, isSlave) } } } -func addBugReportHandler(prefix string, app *appsrv.Application) { +func addBugReportHandler(prefix string, app *appsrv.Application, isSlave bool) { app.AddHandler("GET", fmt.Sprintf("%s/bug-report-status", prefix), bugReportStatusHandler) + if isSlave { + return + } app.AddHandler("POST", fmt.Sprintf("%s/enable-bug-report", prefix), enableBugReportHandler) app.AddHandler("POST", fmt.Sprintf("%s/disable-bug-report", prefix), disableBugReportHandler) app.AddHandler("POST", fmt.Sprintf("%s/send-bug-report", prefix), sendBugReportHandler) diff --git a/pkg/yunionconf/service/service.go b/pkg/yunionconf/service/service.go index 6f58b31af7..0383cfe329 100644 --- a/pkg/yunionconf/service/service.go +++ b/pkg/yunionconf/service/service.go @@ -55,7 +55,7 @@ func StartService() { session := auth.GetAdminSession(ctx, baseOpts.Region) notifyclient.EventNotifyServiceAbnormal(ctx, session.GetToken(), consts.GetServiceType(), method, path, body, err) }) - InitHandlers(app) + InitHandlers(app, opts.IsSlaveNode) db.AppDBInit(app) if db.CheckSync(opts.AutoSyncTable, opts.EnableDBChecksumTables, opts.DBChecksumSkipInit) {