From 1aee43ab031d4871bc8663fdeb0a7e367ffb1020 Mon Sep 17 00:00:00 2001 From: Qiu Jian Date: Thu, 16 Jun 2022 21:24:19 +0800 Subject: [PATCH] fix: scheduler options not updated by OptionManager --- pkg/cloudcommon/options/options.go | 1 + pkg/compute/guestdrivers/openstack.go | 8 ++-- .../algorithm/predicates/baremetal/base.go | 2 +- .../algorithm/predicates/quota_predicate.go | 2 +- pkg/scheduler/api/sched.go | 2 +- pkg/scheduler/cache/candidate/default.go | 12 ++--- pkg/scheduler/cache/candidate/hosts.go | 8 ++-- pkg/scheduler/core/generic_scheduler.go | 6 +-- pkg/scheduler/handler/ping.go | 4 +- pkg/scheduler/manager/completed_queue.go | 8 ++-- pkg/scheduler/manager/expire_queue.go | 8 ++-- pkg/scheduler/manager/manager.go | 2 +- pkg/scheduler/manager/task_history.go | 4 +- pkg/scheduler/options/options.go | 17 +++---- pkg/scheduler/service/service.go | 46 ++++++++++--------- 15 files changed, 65 insertions(+), 65 deletions(-) diff --git a/pkg/cloudcommon/options/options.go b/pkg/cloudcommon/options/options.go index 9fde0e5a76..db94fe8f8b 100644 --- a/pkg/cloudcommon/options/options.go +++ b/pkg/cloudcommon/options/options.go @@ -325,6 +325,7 @@ func ParseOptions(optStruct interface{}, args []string, configFileName string, s if err != nil { log.Fatalf("Set log level %q: %v", optionsRef.LogLevel, err) } + log.Infof("Set log level to %q", optionsRef.LogLevel) log.Logger().Formatter = &log.TextFormatter{ TimestampFormat: "2006-01-02 15:04:05", } diff --git a/pkg/compute/guestdrivers/openstack.go b/pkg/compute/guestdrivers/openstack.go index c8cccd107c..6bd648261b 100644 --- a/pkg/compute/guestdrivers/openstack.go +++ b/pkg/compute/guestdrivers/openstack.go @@ -50,19 +50,19 @@ func init() { } func (self *SOpenStackGuestDriver) DoScheduleCPUFilter() bool { - return scheduler.GetOptions().OpenstackSchedulerCPUFilter + return scheduler.Options.OpenstackSchedulerCPUFilter } func (self *SOpenStackGuestDriver) DoScheduleMemoryFilter() bool { - return scheduler.GetOptions().OpenstackSchedulerMemoryFilter + return scheduler.Options.OpenstackSchedulerMemoryFilter } func (self *SOpenStackGuestDriver) DoScheduleSKUFilter() bool { - return scheduler.GetOptions().OpenstackSchedulerSKUFilter + return scheduler.Options.OpenstackSchedulerSKUFilter } func (self *SOpenStackGuestDriver) DoScheduleStorageFilter() bool { - return scheduler.GetOptions().OpenstackSchedulerStorageFilter + return scheduler.Options.OpenstackSchedulerStorageFilter } func (self *SOpenStackGuestDriver) GetHypervisor() string { diff --git a/pkg/scheduler/algorithm/predicates/baremetal/base.go b/pkg/scheduler/algorithm/predicates/baremetal/base.go index 1b42cf4174..b6d77300fa 100644 --- a/pkg/scheduler/algorithm/predicates/baremetal/base.go +++ b/pkg/scheduler/algorithm/predicates/baremetal/base.go @@ -27,7 +27,7 @@ type BasePredicate struct { } func (p *BasePredicate) PreExecute(ctx context.Context, u *core.Unit, cs []core.Candidater) (bool, error) { - if o.GetOptions().DisableBaremetalPredicates { + if o.Options.DisableBaremetalPredicates { return false, nil } return true, nil diff --git a/pkg/scheduler/algorithm/predicates/quota_predicate.go b/pkg/scheduler/algorithm/predicates/quota_predicate.go index 913690784a..7a7072218a 100644 --- a/pkg/scheduler/algorithm/predicates/quota_predicate.go +++ b/pkg/scheduler/algorithm/predicates/quota_predicate.go @@ -37,7 +37,7 @@ func (p *SQuotaPredicate) Clone() core.FitPredicate { } func (p *SQuotaPredicate) PreExecute(ctx context.Context, u *core.Unit, cs []core.Candidater) (bool, error) { - if !options.GetOptions().EnableQuotaCheck { + if !options.Options.EnableQuotaCheck { return false, nil } if len(u.SchedData().HostId) > 0 { diff --git a/pkg/scheduler/api/sched.go b/pkg/scheduler/api/sched.go index 017dcde165..20e3201f91 100644 --- a/pkg/scheduler/api/sched.go +++ b/pkg/scheduler/api/sched.go @@ -196,7 +196,7 @@ func (data *SchedInfo) reviseData() { data.IgnoreFilters = ignoreFilters if data.SuggestionLimit == 0 { - data.SuggestionLimit = int64(o.GetOptions().SchedulerTestLimit) + data.SuggestionLimit = int64(o.Options.SchedulerTestLimit) } data.Raw = input.JSON(input).String() diff --git a/pkg/scheduler/cache/candidate/default.go b/pkg/scheduler/cache/candidate/default.go index 95db084ed6..79fd7ed975 100644 --- a/pkg/scheduler/cache/candidate/default.go +++ b/pkg/scheduler/cache/candidate/default.go @@ -88,13 +88,13 @@ func generalGetUpdateFunc(isBaremetal bool) cache.GetUpdateFunc { fullUpdateCounter := 0 return func(d []interface{}) ([]string, error) { // Full update every 10 minutes(30s * 20) - if isBaremetal && fullUpdateCounter >= options.GetOptions().BaremetalCandidateCacheReloadCount { + if isBaremetal && fullUpdateCounter >= options.Options.BaremetalCandidateCacheReloadCount { fullUpdateCounter = 1 log.Infof("FullUpdateCounter: %d, update all baremetals.", fullUpdateCounter) return nil, nil } - if !isBaremetal && fullUpdateCounter >= options.GetOptions().HostCandidateCacheReloadCount { + if !isBaremetal && fullUpdateCounter >= options.Options.HostCandidateCacheReloadCount { fullUpdateCounter = 1 log.Infof("FullUpdateCounter: %d, update all hosts.", fullUpdateCounter) return nil, nil @@ -146,8 +146,8 @@ func newHostCache() cache.CachedItem { item.CachedItem = cache.NewCacheItem( HostCandidateCache, - u.ToDuration(options.GetOptions().HostCandidateCacheTTL), - u.ToDuration(options.GetOptions().HostCandidateCachePeriod), + u.ToDuration(options.Options.HostCandidateCacheTTL), + u.ToDuration(options.Options.HostCandidateCachePeriod), uuidKey, update, load, @@ -166,8 +166,8 @@ func newBaremetalCache() cache.CachedItem { item.CachedItem = cache.NewCacheItem( BaremetalCandidateCache, - u.ToDuration(options.GetOptions().BaremetalCandidateCacheTTL), - u.ToDuration(options.GetOptions().BaremetalCandidateCachePeriod), + u.ToDuration(options.Options.BaremetalCandidateCacheTTL), + u.ToDuration(options.Options.BaremetalCandidateCachePeriod), uuidKey, update, load, diff --git a/pkg/scheduler/cache/candidate/hosts.go b/pkg/scheduler/cache/candidate/hosts.go index cc84dca5cf..311aaea8d5 100644 --- a/pkg/scheduler/cache/candidate/hosts.go +++ b/pkg/scheduler/cache/candidate/hosts.go @@ -193,7 +193,7 @@ func NewGuestReservedResourceUsedByBuilder(b *HostBuilder, host *computemodels.S for _, g := range gst { dSize := guestDiskSize(&g, true) disk += int64(dSize) - if o.GetOptions().IgnoreNonrunningGuests && (g.Status == computeapi.VM_READY) { + if o.Options.IgnoreNonrunningGuests && (g.Status == computeapi.VM_READY) { continue } cpu += int64(g.VcpuCount) @@ -779,7 +779,7 @@ func (b *HostBuilder) build() ([]interface{}, error) { descResultLock.Unlock() } - workqueue.Parallelize(o.GetOptions().HostBuildParallelizeSize, len(b.hosts), buildOne) + workqueue.Parallelize(o.Options.HostBuildParallelizeSize, len(b.hosts), buildOne) schedDescs = schedDescs[:descedLen] if len(errs) > 0 { //return nil, errors.NewAggregate(errs) @@ -906,13 +906,13 @@ func (b *HostBuilder) fillGuestsResourceInfo(desc *HostDesc, host *computemodels var memFreeSize int64 var cpuFreeCount int64 - if o.GetOptions().IgnoreNonrunningGuests { + if o.Options.IgnoreNonrunningGuests { memFreeSize = desc.TotalMemSize - desc.RunningMemSize - desc.CreatingMemSize cpuFreeCount = desc.TotalCPUCount - desc.RunningCPUCount - desc.CreatingCPUCount } else { memFreeSize = desc.TotalMemSize - desc.RequiredMemSize cpuFreeCount = desc.TotalCPUCount - desc.RequiredCPUCount - if o.GetOptions().IgnoreFakeDeletedGuests { + if o.Options.IgnoreFakeDeletedGuests { memFreeSize += memFakeDeletedSize cpuFreeCount += cpuFakeDeletedCount } diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index daa974574f..9ac5f09e96 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -424,7 +424,7 @@ func findCandidatesThatFit(ctx context.Context, unit *Unit, candidates []Candida unit.AppendFailedCandidates(fcs) } } - workerSize := o.GetOptions().PredicateParallelizeSize + workerSize := o.Options.PredicateParallelizeSize if workerSize == 0 { workerSize = 1 } @@ -534,7 +534,7 @@ func unitFitsOnCandidate( // the configured predicates even after one or more of them fails. // When the flag is set to false, scheduler skips checking the rest // of the predicates after it finds one predicate that failed. - if !o.GetOptions().AlwaysCheckAllPredicates { + if !o.Options.AlwaysCheckAllPredicates { break } } @@ -590,7 +590,7 @@ func PrioritizeCandidates( } } } - workqueue.Parallelize(o.GetOptions().PriorityParallelizeSize, len(candidates), processCandidate) + workqueue.Parallelize(o.Options.PriorityParallelizeSize, len(candidates), processCandidate) for i, p := range newPriorities { wg.Add(1) diff --git a/pkg/scheduler/handler/ping.go b/pkg/scheduler/handler/ping.go index 0c0cd87a41..a0c7a3992c 100644 --- a/pkg/scheduler/handler/ping.go +++ b/pkg/scheduler/handler/ping.go @@ -56,10 +56,10 @@ func switchHandler(c *gin.Context) { counter++ var result string if counter%2 == 1 { - o.GetOptions().DisableBaremetalPredicates = true + o.Options.DisableBaremetalPredicates = true result = fmt.Sprintf("ignore_baremetal_filter_switch is true") } else { - o.GetOptions().DisableBaremetalPredicates = false + o.Options.DisableBaremetalPredicates = false result = fmt.Sprintf("ignore_baremetal_filter_switch is false") } diff --git a/pkg/scheduler/manager/completed_queue.go b/pkg/scheduler/manager/completed_queue.go index 2f7cbc6668..102b9c2723 100644 --- a/pkg/scheduler/manager/completed_queue.go +++ b/pkg/scheduler/manager/completed_queue.go @@ -31,7 +31,7 @@ type CompletedManager struct { func NewCompletedManager(stopCh <-chan struct{}) *CompletedManager { return &CompletedManager{ - completedChannel: make(chan *api.CompletedNotifyArgs, o.GetOptions().CompletedQueueMaxLength), + completedChannel: make(chan *api.CompletedNotifyArgs, o.Options.CompletedQueueMaxLength), stopCh: stopCh, } } @@ -41,7 +41,7 @@ func (c *CompletedManager) Add(completedNotifyArgs *api.CompletedNotifyArgs) { } func (c *CompletedManager) Run() { - t := time.Tick(utils.ToDuration(o.GetOptions().CompletedQueueConsumptionPeriod)) + t := time.Tick(utils.ToDuration(o.Options.CompletedQueueConsumptionPeriod)) removeSession := func() { //completedNotifyArgs := <-c.completedChannel @@ -79,7 +79,7 @@ func (c *CompletedManager) Run() { wg.Wrap(removeSession) } - if ok := utils.WaitTimeOut(wg, time.Duration(completedRequestNumber)*utils.ToDuration(o.GetOptions().CompletedQueueConsumptionTimeout)); !ok { + if ok := utils.WaitTimeOut(wg, time.Duration(completedRequestNumber)*utils.ToDuration(o.Options.CompletedQueueConsumptionTimeout)); !ok { log.Errorln("time out reload data in completed when remove sessions.") } } @@ -98,7 +98,7 @@ func (c *CompletedManager) Run() { return default: // if sessions' number is bigger then 10 then reload and remove. - if len(c.completedChannel) >= o.GetOptions().CompletedQueueDealLength { + if len(c.completedChannel) >= o.Options.CompletedQueueDealLength { reloadAndRemoveSessions() } else { time.Sleep(10 * time.Second) diff --git a/pkg/scheduler/manager/expire_queue.go b/pkg/scheduler/manager/expire_queue.go index 3672ea193c..3b4ac753d7 100644 --- a/pkg/scheduler/manager/expire_queue.go +++ b/pkg/scheduler/manager/expire_queue.go @@ -35,7 +35,7 @@ type ExpireManager struct { func NewExpireManager(stopCh <-chan struct{}) *ExpireManager { return &ExpireManager{ - expireChannel: make(chan *api.ExpireArgs, o.GetOptions().ExpireQueueMaxLength), + expireChannel: make(chan *api.ExpireArgs, o.Options.ExpireQueueMaxLength), stopCh: stopCh, mergeLock: new(sync.Mutex), } @@ -62,7 +62,7 @@ func newExpireHost(id string, sid string) *expireHost { } func (e *ExpireManager) Run() { - t := time.Tick(u.ToDuration(o.GetOptions().ExpireQueueConsumptionPeriod)) + t := time.Tick(u.ToDuration(o.Options.ExpireQueueConsumptionPeriod)) // Watching the expires. for { select { @@ -77,7 +77,7 @@ func (e *ExpireManager) Run() { return default: // if expire number is bigger then 80 then update - if len(e.expireChannel) >= o.GetOptions().ExpireQueueDealLength { + if len(e.expireChannel) >= o.Options.ExpireQueueDealLength { e.batchMergeExpire() } else { time.Sleep(1 * time.Second) @@ -137,7 +137,7 @@ func (e *ExpireManager) batchMergeExpire() { schedManager.HistoryManager.CancelCandidatesPendingUsage(dirtyBaremetals) } }() - if ok := e.waitTimeOut(wg, u.ToDuration(o.GetOptions().ExpireQueueConsumptionTimeout)); !ok { + if ok := e.waitTimeOut(wg, u.ToDuration(o.Options.ExpireQueueConsumptionTimeout)); !ok { log.Errorln("time out reload data.") } } diff --git a/pkg/scheduler/manager/manager.go b/pkg/scheduler/manager/manager.go index a116a75121..4051305a5c 100644 --- a/pkg/scheduler/manager/manager.go +++ b/pkg/scheduler/manager/manager.go @@ -56,7 +56,7 @@ func NewSchedulerManager(stopCh <-chan struct{}) *SchedulerManager { sm.CompletedManager = NewCompletedManager(stopCh) sm.HistoryManager = NewHistoryManager(stopCh) sm.TaskManager = NewTaskManager(stopCh) - sm.KubeClusterManager = k8s.NewKubeClusterManager(o.GetOptions().Region, 30*time.Second) + sm.KubeClusterManager = k8s.NewKubeClusterManager(o.Options.Region, 30*time.Second) return sm } diff --git a/pkg/scheduler/manager/task_history.go b/pkg/scheduler/manager/task_history.go index a22e7db58c..a3d7c42077 100644 --- a/pkg/scheduler/manager/task_history.go +++ b/pkg/scheduler/manager/task_history.go @@ -107,7 +107,7 @@ type HistoryManager struct { func NewHistoryManager(stopCh <-chan struct{}) *HistoryManager { return &HistoryManager{ - capacity: o.GetOptions().SchedulerHistoryLimit, + capacity: o.Options.SchedulerHistoryLimit, historyMap: make(map[string]*HistoryItem), historyList: list.New(), normalHistoryList: list.New(), @@ -160,7 +160,7 @@ func (m *HistoryManager) cleanHistoryMap() { } func (m *HistoryManager) Run() { - go wait.Until(m.cleanHistoryMap, u.ToDuration(o.GetOptions().SchedulerHistoryCleanPeriod), m.stopCh) + go wait.Until(m.cleanHistoryMap, u.ToDuration(o.Options.SchedulerHistoryCleanPeriod), m.stopCh) } func (m *HistoryManager) GetHistoryList(offset int64, limit int64, all bool, isSuggestion bool) ([]*HistoryItem, int64) { diff --git a/pkg/scheduler/options/options.go b/pkg/scheduler/options/options.go index c0d9d15282..a7516becd8 100644 --- a/pkg/scheduler/options/options.go +++ b/pkg/scheduler/options/options.go @@ -28,7 +28,7 @@ type SchedulerOptions struct { SchedOptions // gin http framework mode - GinMode string `help:"gin http framework work mode" default:"debug" choices:"debug|release"` + // GinMode string `help:"gin http framework work mode" default:"debug" choices:"debug|release"` } type SchedOptions struct { @@ -120,16 +120,12 @@ func OnOpenstackOptionsChange(oOpts, nOpts interface{}) bool { } var ( - opt SchedulerOptions + Options SchedulerOptions ) -func GetOptions() *SchedulerOptions { - return &opt -} - func Init() { - common_options.ParseOptions(&opt, os.Args, "region.conf", api.SERVICE_TYPE) - options.Options = opt.ComputeOptions + common_options.ParseOptions(&Options, os.Args, "region.conf", api.SERVICE_TYPE) + options.Options = Options.ComputeOptions } func OnOptionsChange(oldO, newO interface{}) bool { @@ -143,10 +139,11 @@ func OnOptionsChange(oldO, newO interface{}) bool { if common_options.OnDBOptionsChange(&oldOpts.DBOptions, &newOpts.DBOptions) { changed = true } - - if OnOptionsChange(&oldOpts.OpenstackOptions, &newOpts.OpenstackOptions) { + if OnOpenstackOptionsChange(&oldOpts.OpenstackOptions, &newOpts.OpenstackOptions) { changed = true } + options.Options = newOpts.ComputeOptions + return changed } diff --git a/pkg/scheduler/service/service.go b/pkg/scheduler/service/service.go index 5f224ff40e..b3fe21e541 100644 --- a/pkg/scheduler/service/service.go +++ b/pkg/scheduler/service/service.go @@ -46,21 +46,11 @@ import ( func StartService() error { o.Init() - opts := o.GetOptions() - dbOpts := &opts.DBOptions + dbOpts := o.Options.DBOptions - // gin http framework mode configuration - gin.SetMode(opts.GinMode) - - startSched := func() { - stopEverything := make(chan struct{}) - go skuman.Start(utils.ToDuration(opts.SkuRefreshInterval)) - schedman.InitAndStart(stopEverything) - } - - opts.Port = opts.SchedulerPort + o.Options.Port = o.Options.SchedulerPort // init region compute models - cloudcommon.InitDB(dbOpts) + cloudcommon.InitDB(&dbOpts) defer cloudcommon.CloseDB() db.InitAllManagers() @@ -80,23 +70,36 @@ func StartService() error { count++ } - commonOpts := &opts.CommonOptions + commonOpts := &o.Options.CommonOptions app_common.InitAuth(commonOpts, func() { log.Infof("Auth complete!!") - startSched() }) + common_options.StartOptionManager(&o.Options, o.Options.ConfigSyncPeriodSeconds, compute_api.SERVICE_TYPE, compute_api.SERVICE_VERSION, o.OnOptionsChange) + + // gin http framework mode configuration + ginMode := "release" + if o.Options.LogLevel == "debug" { + ginMode = "debug" + } + gin.SetMode(ginMode) + + startSched := func() { + stopEverything := make(chan struct{}) + go skuman.Start(utils.ToDuration(o.Options.SkuRefreshInterval)) + schedman.InitAndStart(stopEverything) + } + startSched() + if err := computemodels.InitDB(); err != nil { log.Fatalf("InitDB fail: %s", err) } - common_options.StartOptionManager(&opts, opts.ConfigSyncPeriodSeconds, compute_api.SERVICE_TYPE, compute_api.SERVICE_VERSION, o.OnOptionsChange) - - app := app_common.InitApp(&opts.BaseOptions, true) + app := app_common.InitApp(&o.Options.BaseOptions, true) db.AppDBInit(app) //InitHandlers(app) - return startHTTP(opts) + return startHTTP(&o.Options) } func startHTTP(opt *o.SchedulerOptions) error { @@ -117,9 +120,8 @@ func startHTTP(opt *o.SchedulerOptions) error { log.Infof("Start server on: %s:%d", opt.Address, opt.Port) - if o.GetOptions().EnableSsl { - return server.ListenAndServeTLS(o.GetOptions().SslCertfile, - o.GetOptions().SslKeyfile) + if opt.EnableSsl { + return server.ListenAndServeTLS(opt.SslCertfile, opt.SslKeyfile) } else { return server.ListenAndServe() }