From 2c5ed4b3607a96109a4e6f288ba1dddfd45b04a8 Mon Sep 17 00:00:00 2001 From: Zexi Li Date: Tue, 11 Dec 2018 18:23:09 +0800 Subject: [PATCH] scheduler: make schedtag prefer and avoid strategy work --- pkg/scheduler/algorithm/plugin/plugin.go | 12 + .../predicates/aggregate_predicate.go | 56 ++-- .../predicates/baremetal/network_predicate.go | 10 +- .../predicates/guest/group_predicate.go | 50 ++- .../predicates/guest/network_predicate.go | 19 +- .../priorities/guest/avoid_same_host.go | 2 +- .../algorithm/priorities/guest/capacity.go | 2 +- .../algorithm/priorities/guest/creating.go | 4 +- .../algorithm/priorities/guest/lowload.go | 7 +- .../algorithm/priorities/priorities.go | 49 ++- pkg/scheduler/algorithmprovider/defaults.go | 1 - pkg/scheduler/core/context.go | 95 +++--- pkg/scheduler/core/generic_scheduler.go | 63 ++-- pkg/scheduler/core/score/score.go | 294 ++++++++++++++++++ pkg/scheduler/core/score/score_test.go | 156 ++++++++++ pkg/scheduler/core/types.go | 17 +- pkg/util/hashcache/doc.go | 1 + 17 files changed, 642 insertions(+), 196 deletions(-) create mode 100644 pkg/scheduler/algorithm/plugin/plugin.go create mode 100644 pkg/scheduler/core/score/score.go create mode 100644 pkg/scheduler/core/score/score_test.go create mode 100644 pkg/util/hashcache/doc.go diff --git a/pkg/scheduler/algorithm/plugin/plugin.go b/pkg/scheduler/algorithm/plugin/plugin.go new file mode 100644 index 0000000000..7eba61ab19 --- /dev/null +++ b/pkg/scheduler/algorithm/plugin/plugin.go @@ -0,0 +1,12 @@ +package plugin + +import ( + "yunion.io/x/onecloud/pkg/scheduler/core" +) + +type BasePlugin struct{} + +// Customize priority +func (p BasePlugin) OnPriorityEnd(u *core.Unit, c core.Candidater) {} + +func (p BasePlugin) OnSelectEnd(u *core.Unit, c core.Candidater, count int64) {} diff --git a/pkg/scheduler/algorithm/predicates/aggregate_predicate.go b/pkg/scheduler/algorithm/predicates/aggregate_predicate.go index 3b869f94bd..6563aaf7b0 100644 --- a/pkg/scheduler/algorithm/predicates/aggregate_predicate.go +++ b/pkg/scheduler/algorithm/predicates/aggregate_predicate.go @@ -6,8 +6,10 @@ import ( "yunion.io/x/jsonutils" "yunion.io/x/log" + "yunion.io/x/onecloud/pkg/scheduler/algorithm/plugin" "yunion.io/x/onecloud/pkg/scheduler/api" "yunion.io/x/onecloud/pkg/scheduler/core" + "yunion.io/x/onecloud/pkg/scheduler/core/score" "yunion.io/x/onecloud/pkg/scheduler/db/models" "yunion.io/x/onecloud/pkg/util/conditionparser" ) @@ -23,6 +25,7 @@ import ( // the host is available. type AggregatePredicate struct { BasePredicate + plugin.BasePlugin AggregateHosts hostsAggregatesMap RequireAggregates []api.Aggregate ExcludeAggregates []api.Aggregate @@ -193,33 +196,6 @@ func getHostAggregateCount(inAggs []api.Aggregate, hAggs []*models.Aggregate, st return } -func (p *AggregatePredicate) OnSelect(u *core.Unit, c core.Candidater) bool { - hostAggs, ok := p.AggregateHosts[c.IndexKey()] - if !ok { - return true - } - - avoidCountMap := getHostAggregateCount(p.AvoidAggregates, hostAggs, api.AggregateStrategyAvoid) - preferCountMap := getHostAggregateCount(p.PreferAggregates, hostAggs, api.AggregateStrategyPrefer) - - setScore := func(aggCountMap map[string]int, postiveScore bool) { - stepScore := core.PriorityStep - if !postiveScore { - stepScore = -stepScore - } - for n, count := range aggCountMap { - u.IncreaseScore(c.IndexKey(), n, count*stepScore) - } - } - - setScore(avoidCountMap, false) - setScore(preferCountMap, true) - - return true -} - -func (p *AggregatePredicate) OnSelectEnd(u *core.Unit, c core.Candidater, count int64) {} - func (p *AggregatePredicate) Execute(u *core.Unit, c core.Candidater) (bool, []core.PredicateFailureReason, error) { h := NewPredicateHelper(p, u, c) @@ -281,3 +257,29 @@ func (p *AggregatePredicate) exec(h *PredicateHelper) string { return "" } + +func (p *AggregatePredicate) OnPriorityEnd(u *core.Unit, c core.Candidater) { + hostAggs, ok := p.AggregateHosts[c.IndexKey()] + if !ok { + return + } + + avoidCountMap := getHostAggregateCount(p.AvoidAggregates, hostAggs, api.AggregateStrategyAvoid) + preferCountMap := getHostAggregateCount(p.PreferAggregates, hostAggs, api.AggregateStrategyPrefer) + + setScore := func(aggCountMap map[string]int, postiveScore bool) { + stepScore := core.PriorityStep + if !postiveScore { + stepScore = -stepScore + } + for n, count := range aggCountMap { + u.SetFrontScore( + c.IndexKey(), + score.NewScore(score.TScore(count*stepScore), n), + ) + } + } + + setScore(preferCountMap, true) + setScore(avoidCountMap, false) +} diff --git a/pkg/scheduler/algorithm/predicates/baremetal/network_predicate.go b/pkg/scheduler/algorithm/predicates/baremetal/network_predicate.go index 45f73219e7..973244cc0b 100644 --- a/pkg/scheduler/algorithm/predicates/baremetal/network_predicate.go +++ b/pkg/scheduler/algorithm/predicates/baremetal/network_predicate.go @@ -7,6 +7,7 @@ import ( "yunion.io/x/pkg/utils" + "yunion.io/x/onecloud/pkg/scheduler/algorithm/plugin" "yunion.io/x/onecloud/pkg/scheduler/algorithm/predicates" "yunion.io/x/onecloud/pkg/scheduler/api" "yunion.io/x/onecloud/pkg/scheduler/core" @@ -14,6 +15,7 @@ import ( type NetworkPredicate struct { BasePredicate + plugin.BasePlugin SelectedNetworks sync.Map } @@ -141,11 +143,3 @@ func (p *NetworkPredicate) Execute(u *core.Unit, c core.Candidater) (bool, []cor return h.GetResult() } - -func (p *NetworkPredicate) OnSelect(u *core.Unit, c core.Candidater) bool { - u.SetFiltedData(c.IndexKey(), "networks", &p.SelectedNetworks) - return true -} - -func (p *NetworkPredicate) OnSelectEnd(u *core.Unit, c core.Candidater, count int64) { -} diff --git a/pkg/scheduler/algorithm/predicates/guest/group_predicate.go b/pkg/scheduler/algorithm/predicates/guest/group_predicate.go index 3233bc1f77..9c9610c73c 100644 --- a/pkg/scheduler/algorithm/predicates/guest/group_predicate.go +++ b/pkg/scheduler/algorithm/predicates/guest/group_predicate.go @@ -3,14 +3,17 @@ package guest import ( "fmt" + "yunion.io/x/onecloud/pkg/scheduler/algorithm/plugin" "yunion.io/x/onecloud/pkg/scheduler/algorithm/predicates" "yunion.io/x/onecloud/pkg/scheduler/core" + "yunion.io/x/onecloud/pkg/scheduler/core/score" ) // GroupPredicate filter the packet based on the label information, // the same group of guests should avoid schedule on same host. type GroupPredicate struct { predicates.BasePredicate + plugin.BasePlugin ExcludeGroups []string RequireGroups []string @@ -47,33 +50,6 @@ func (p *GroupPredicate) PreExecute(u *core.Unit, cs []core.Candidater) (bool, e return true, nil } -func (p *GroupPredicate) OnSelect(u *core.Unit, c core.Candidater) bool { - if len(p.ExcludeGroups) > 0 { - return false - } - - if len(p.RequireGroups) > 0 { - // TODO: what? - } - - if len(p.AvoidGroups) > 0 { - u.IncreaseScore(c.IndexKey(), - p.Name()+":avoid", -core.PriorityStep*len(p.AvoidGroups), - ) - } - - if len(p.PreferGroups) > 0 { - u.IncreaseScore(c.IndexKey(), - p.Name()+":prefer", core.PriorityStep*len(p.PreferGroups), - ) - } - - return true -} - -func (p *GroupPredicate) OnSelectEnd(u *core.Unit, c core.Candidater, count int64) { -} - func (p *GroupPredicate) Execute(u *core.Unit, c core.Candidater) (bool, []core.PredicateFailureReason, error) { h := predicates.NewPredicateHelper(p, u, c) @@ -101,3 +77,23 @@ func (p *GroupPredicate) Execute(u *core.Unit, c core.Candidater) (bool, []core. return h.GetResult() } + +func (p *GroupPredicate) OnPriorityEnd(u *core.Unit, c core.Candidater) { + if len(p.AvoidGroups) > 0 { + u.SetFrontScore( + c.IndexKey(), + score.NewScore( + score.TScore(-core.PriorityStep*len(p.AvoidGroups)), + p.Name()+":avoid", + )) + } + + if len(p.PreferGroups) > 0 { + u.SetFrontScore( + c.IndexKey(), + score.NewScore( + score.TScore(core.PriorityStep*len(p.PreferGroups)), + p.Name()+":prefer", + )) + } +} diff --git a/pkg/scheduler/algorithm/predicates/guest/network_predicate.go b/pkg/scheduler/algorithm/predicates/guest/network_predicate.go index be04fba623..ab3b888cc1 100644 --- a/pkg/scheduler/algorithm/predicates/guest/network_predicate.go +++ b/pkg/scheduler/algorithm/predicates/guest/network_predicate.go @@ -5,12 +5,14 @@ import ( "strings" "sync" + "yunion.io/x/pkg/util/sets" + "yunion.io/x/pkg/utils" + + "yunion.io/x/onecloud/pkg/scheduler/algorithm/plugin" "yunion.io/x/onecloud/pkg/scheduler/algorithm/predicates" "yunion.io/x/onecloud/pkg/scheduler/api" "yunion.io/x/onecloud/pkg/scheduler/core" networks "yunion.io/x/onecloud/pkg/scheduler/db/models" - "yunion.io/x/pkg/util/sets" - "yunion.io/x/pkg/utils" ) // NetworkPredicate will filter the current network information with @@ -18,6 +20,7 @@ import ( // randomly match the available network resources. type NetworkPredicate struct { predicates.BasePredicate + plugin.BasePlugin SelectedNetworks sync.Map } @@ -105,10 +108,6 @@ func (p *NetworkPredicate) Execute(u *core.Unit, c core.Candidater) (bool, []cor p.SelectedNetworks.Store(n.ID, counter.GetCount()) counters.Add(counter) found = true - - if counters.GetCount() >= d.Count { - break - } } else { fullErrMsgs = append(fullErrMsgs, fmt.Sprintf("%s: %s", n.ID, strings.Join(errMsgs, ",")), @@ -202,11 +201,3 @@ func (p *NetworkPredicate) Execute(u *core.Unit, c core.Candidater) (bool, []cor return h.GetResult() } - -func (p *NetworkPredicate) OnSelect(u *core.Unit, c core.Candidater) bool { - u.SetFiltedData(c.IndexKey(), "networks", &p.SelectedNetworks) - return true -} - -func (p *NetworkPredicate) OnSelectEnd(u *core.Unit, c core.Candidater, count int64) { -} diff --git a/pkg/scheduler/algorithm/priorities/guest/avoid_same_host.go b/pkg/scheduler/algorithm/priorities/guest/avoid_same_host.go index eb784a4213..46698dd13d 100644 --- a/pkg/scheduler/algorithm/priorities/guest/avoid_same_host.go +++ b/pkg/scheduler/algorithm/priorities/guest/avoid_same_host.go @@ -27,7 +27,7 @@ func (p *AvoidSameHostPriority) Map(u *core.Unit, c core.Candidater) (core.HostP ownerTenantID := u.SchedData().OwnerTenantID if count, ok := hc.Tenants[ownerTenantID]; ok && count > 0 { - h.SetScore(-50 * int(count)) + h.SetFrontRawScore(-1 * int(count)) } return h.GetResult() diff --git a/pkg/scheduler/algorithm/priorities/guest/capacity.go b/pkg/scheduler/algorithm/priorities/guest/capacity.go index 6b0201ab71..cb66c496a6 100644 --- a/pkg/scheduler/algorithm/priorities/guest/capacity.go +++ b/pkg/scheduler/algorithm/priorities/guest/capacity.go @@ -21,7 +21,7 @@ func (p *CapacityPriority) Map(u *core.Unit, c core.Candidater) (core.HostPriori h := priorities.NewPriorityHelper(p, u, c) capacity := u.GetCapacity(c.IndexKey()) - h.SetScore(50 * int(capacity)) + h.SetRawScore(int(capacity)) return h.GetResult() } diff --git a/pkg/scheduler/algorithm/priorities/guest/creating.go b/pkg/scheduler/algorithm/priorities/guest/creating.go index 7e3dc12c8e..9d6fe061ba 100644 --- a/pkg/scheduler/algorithm/priorities/guest/creating.go +++ b/pkg/scheduler/algorithm/priorities/guest/creating.go @@ -27,8 +27,8 @@ func (p *CreatingPriority) Map(u *core.Unit, c core.Candidater) (core.HostPriori } if hc.CreatingGuestCount > 0 { - score := -int(hc.CreatingGuestCount) * 20 - h.SetScore(score) + score := -int(hc.CreatingGuestCount) + h.SetFrontScore(score) } return h.GetResult() diff --git a/pkg/scheduler/algorithm/priorities/guest/lowload.go b/pkg/scheduler/algorithm/priorities/guest/lowload.go index cf74609fec..79d56f6c84 100644 --- a/pkg/scheduler/algorithm/priorities/guest/lowload.go +++ b/pkg/scheduler/algorithm/priorities/guest/lowload.go @@ -3,6 +3,7 @@ package guest import ( "yunion.io/x/onecloud/pkg/scheduler/algorithm/priorities" "yunion.io/x/onecloud/pkg/scheduler/core" + "yunion.io/x/onecloud/pkg/scheduler/core/score" ) type LowLoadPriority struct { @@ -28,8 +29,12 @@ func (p *LowLoadPriority) Map(u *core.Unit, c core.Candidater) (core.HostPriorit cpuCommitRate := float64(hc.RunningCPUCount) / float64(hc.TotalCPUCount) memCommitRate := float64(hc.RunningMemSize) / float64(hc.TotalMemSize) if cpuCommitRate < 0.5 && memCommitRate < 0.5 { - score := 20 * (1 - cpuCommitRate - memCommitRate) + score := 10 * (1 - cpuCommitRate - memCommitRate) h.SetScore(int(score)) } return h.GetResult() } + +func (p *LowLoadPriority) ScoreIntervals() score.Intervals { + return score.NewIntervals(0, 1, 5) +} diff --git a/pkg/scheduler/algorithm/priorities/priorities.go b/pkg/scheduler/algorithm/priorities/priorities.go index 8b3560a0de..cc18857c11 100644 --- a/pkg/scheduler/algorithm/priorities/priorities.go +++ b/pkg/scheduler/algorithm/priorities/priorities.go @@ -1,23 +1,18 @@ package priorities import ( - "math" - "yunion.io/x/onecloud/pkg/scheduler/algorithm" "yunion.io/x/onecloud/pkg/scheduler/cache/candidate" "yunion.io/x/onecloud/pkg/scheduler/core" + "yunion.io/x/onecloud/pkg/scheduler/core/score" ) -func aggPriority(x float64) float64 { - return math.Log(x + math.Sqrt(x*x+1)) -} - // PriorityHelper is a struct that as a base interface for all priorities. type PriorityHelper struct { priority core.Priority unit *core.Unit Candidate core.Candidater - score int + score score.SScore err error } @@ -29,9 +24,38 @@ func NewPriorityHelper(p core.Priority, u *core.Unit, c core.Candidater) *Priori } } -func (h *PriorityHelper) SetScore(score int) { - h.score = score - h.unit.SetScore(h.Candidate.IndexKey(), h.priority.Name(), score) +func (h *PriorityHelper) setIntervalScore(val int) score.SScore { + h.score = score.NewScore( + h.priority.ScoreIntervals().ToScore(int64(val)), + h.priority.Name()) + return h.score +} + +func (h *PriorityHelper) setRawScore(val int) score.SScore { + h.score = score.NewScore( + score.TScore(val), + h.priority.Name()) + return h.score +} + +func (h *PriorityHelper) SetScore(val int) { + h.setIntervalScore(val) + h.unit.SetScore(h.Candidate.IndexKey(), h.score) +} + +func (h *PriorityHelper) SetFrontScore(val int) { + h.setIntervalScore(val) + h.unit.SetFrontScore(h.Candidate.IndexKey(), h.score) +} + +func (h *PriorityHelper) SetRawScore(val int) { + h.setRawScore(val) + h.unit.SetScore(h.Candidate.IndexKey(), h.score) +} + +func (h *PriorityHelper) SetFrontRawScore(val int) { + h.setRawScore(val) + h.unit.SetFrontScore(h.Candidate.IndexKey(), h.score) } func (h *PriorityHelper) SetError(err error) { @@ -41,7 +65,6 @@ func (h *PriorityHelper) SetError(err error) { func (h *PriorityHelper) GetResult() (core.HostPriority, error) { return core.HostPriority{ Host: h.Candidate.IndexKey(), - Score: h.score, Candidate: h.Candidate, }, h.err } @@ -71,3 +94,7 @@ func (b *BasePriority) Name() string { func (b *BasePriority) HostCandidate(c core.Candidater) (*candidate.HostDesc, error) { return algorithm.ToHostCandidate(c) } + +func (b *BasePriority) ScoreIntervals() score.Intervals { + return score.NewIntervals(0, 1, 2) +} diff --git a/pkg/scheduler/algorithmprovider/defaults.go b/pkg/scheduler/algorithmprovider/defaults.go index 6da42fc24b..2eec8ec4ed 100644 --- a/pkg/scheduler/algorithmprovider/defaults.go +++ b/pkg/scheduler/algorithmprovider/defaults.go @@ -31,7 +31,6 @@ func defaultPredicates() sets.String { func defaultPriorities() sets.String { return sets.NewString( - factory.RegisterPriority("guest-avoid-same-cluster", &priorityguest.AvoidSameClusterPriority{}, 1), factory.RegisterPriority("guest-avoid-same-host", &priorityguest.AvoidSameHostPriority{}, 1), factory.RegisterPriority("guest-lowload", &priorityguest.LowLoadPriority{}, 1), factory.RegisterPriority("guest-creating", &priorityguest.CreatingPriority{}, 1), diff --git a/pkg/scheduler/core/context.go b/pkg/scheduler/core/context.go index de1044dbdc..a0f4f30ff9 100644 --- a/pkg/scheduler/core/context.go +++ b/pkg/scheduler/core/context.go @@ -9,17 +9,15 @@ import ( "yunion.io/x/log" "yunion.io/x/onecloud/pkg/scheduler/api" + "yunion.io/x/onecloud/pkg/scheduler/core/score" ) const ( - EmptyScore int = 0x7FFFFFFFFFFFFFFF - BaseScore int = 10000 EmptyCapacity int64 = -1 MaxCapacity int64 = 0x7FFFFFFFFFFFFFFF ) var ( - EmptyScores = make(map[string]int) EmptyCapacities = make(map[string]Counter) ) @@ -186,8 +184,19 @@ type Capacity struct { } type Score struct { - Values map[string]int - Sum int + *score.ScoreBucket +} + +func newScore() *Score { + return &Score{ + ScoreBucket: score.NewScoreBuckets(), + } +} + +func newZeroScore() Score { + s := newScore() + s.Append(score.NewZeroScore()) + return *s } type SchedContextDataItem struct { @@ -484,7 +493,11 @@ func validateCapacityInput(c Counter) bool { return false } -func (u *Unit) SetScore(id, name string, score int) error { +type ScoreValue struct { + value score.TScore +} + +func (u *Unit) setScore(id string, val score.SScore, tofront bool) { u.scoreLock.Lock() defer u.scoreLock.Unlock() @@ -494,75 +507,45 @@ func (u *Unit) SetScore(id, name string, score int) error { ) if scoreObj, ok = u.ScoreMap[id]; !ok { - scoreObj = Score{Values: make(map[string]int), Sum: EmptyScore} + scoreObj = *newScore() u.ScoreMap[id] = scoreObj } - scoreObj.Values[name] = score - scoreObj.Sum = EmptyScore - - log.V(10).Infof("%q SetScore: %q -> %d", name, id, score) - return nil -} - -func (u *Unit) IncreaseScore(id string, name string, increase int) error { - - u.scoreLock.Lock() - defer u.scoreLock.Unlock() - - var ( - scoreObj Score - ok bool - ) - - score := int(0) - if scoreObj, ok = u.ScoreMap[id]; !ok { - scoreObj = Score{Values: make(map[string]int), Sum: EmptyScore} - u.ScoreMap[id] = scoreObj - score = increase + if tofront { + scoreObj.AddToFirst(val) } else { - if value, ok := scoreObj.Values[name]; ok { - score = value + increase - } else { - score = increase - } + scoreObj.SetScore(val) } - scoreObj.Values[name] = score - scoreObj.Sum = EmptyScore - - log.V(10).Infof("%q IncreaseScore: %q -> %d", name, id, score) - return nil + log.V(10).Infof("SetScore: %q -> %s", id, val.String()) } -func (u *Unit) GetScore(id string) int { +func (u *Unit) SetScore(id string, val score.SScore) { + u.setScore(id, val, false) +} + +func (u *Unit) SetFrontScore(id string, val score.SScore) { + u.setScore(id, val, true) +} + +func (u *Unit) GetScore(id string) Score { var ( scoreObj Score ok bool ) if scoreObj, ok = u.ScoreMap[id]; !ok { - return BaseScore + return *newScore() } - - if scoreObj.Sum == EmptyScore { - sum := int(0) - for _, value := range scoreObj.Values { - sum += value - } - - scoreObj.Sum = sum - } - - return scoreObj.Sum + BaseScore + return scoreObj } -func (u *Unit) GetScores(id string) map[string]int { - if scores, ok := u.ScoreMap[id]; ok { - return scores.Values +func (u *Unit) GetScoreDetails(id string) string { + if score, ok := u.ScoreMap[id]; ok { + return score.String() } - return EmptyScores + return "EmptyScore" } func (u *Unit) SetFiltedData(id string, name string, data interface{}) error { u.scoreLock.Lock() diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index bf02d7145f..f5533ef3e4 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -169,13 +169,13 @@ func newSchedResultByCtx(u *Unit, count int64, c Candidater) *SchedResultItem { Count: count, Capacity: u.GetCapacity(id), Name: fmt.Sprintf("%v", c.Get("Name")), - Score: u.GetScore(id), + Score: u.GetScore(id).DigitString(), Data: u.GetFiltedData(id, count), } if showDetails { r.CapacityDetails = GetCapacities(u, id) - r.ScoreDetails = u.GetScores(id) + r.ScoreDetails = u.GetScoreDetails(id) } return r } @@ -229,10 +229,10 @@ type SchedResultItem struct { Count int64 `json:"count"` Data map[string]interface{} `json:"data"` Capacity int64 `json:"capacity"` - Score int `json:"score"` + Score string `json:"score"` CapacityDetails map[string]int64 `json:"capacity_details"` - ScoreDetails map[string]int `json:"score_details"` + ScoreDetails string `json:"score_details"` } func GetCapacities(u *Unit, id string) (res map[string]int64) { @@ -246,22 +246,6 @@ func GetCapacities(u *Unit, id string) (res map[string]int64) { return } -func GetScore(u *Unit, id string, details bool) string { - score := u.GetScore(id) - s := fmt.Sprintf("%v", score) - if details { - scores := u.GetScores(id) - if len(scores) > 0 { - ss := []string{} - for name, score := range scores { - ss = append(ss, fmt.Sprintf("%v:%v", name, score)) - } - s += " (" + strings.Join(ss, ", ") + ")" - } - } - return s -} - type SchedResultItemList struct { Unit *Unit Data []*SchedResultItem @@ -325,16 +309,17 @@ func SelectHosts(unit *Unit, priorityList HostPriorityList) ([]*SelectedCandidat return nil, fmt.Errorf("SelectHosts get empty priorityList.") } - sort.Sort(sort.Reverse(priorityList)) - selectedMap := make(map[string]*SelectedCandidate) schedData := unit.SchedData() count := schedData.Count isSuggestion := unit.SchedInfo.IsSuggestion bestEffort := unit.SchedInfo.BestEffort selectedCandidates := []*SelectedCandidate{} + plugins := unit.AllSelectPlugins() + sort.Sort(sort.Reverse(priorityList)) + completed: for len(priorityList) > 0 { log.V(10).Debugf("PriorityList: %#v", priorityList) @@ -357,18 +342,8 @@ completed: } selectedItem.Count++ count-- - doPlugins := func() bool { - r := true - for _, plugin := range plugins { - if !plugin.OnSelect(unit, selectedItem.Candidate) { - r = false - } - } - return r - } - // if no one of plugins return false or capacity of the host large than - // selected count, this host can be added to priorityList. - if doPlugins() && unit.GetCapacity(hostID) > selectedItem.Count { + // if capacity of the host large than selected count, this host can be added to priorityList. + if unit.GetCapacity(hostID) > selectedItem.Count { priorityList0 = append(priorityList0, it) } } @@ -635,15 +610,23 @@ func PrioritizeCandidates( result := make(HostPriorityList, 0, len(candidates)) // TODO: Consider parallelizing it - for i := range candidates { - result = append(result, HostPriority{Host: candidates[i].IndexKey(), Score: 0, Candidate: candidates[i]}) - for j := range newPriorities { - result[i].Score += results[j][i].Score * newPriorities[j].Weight + // Do plugin priorities step + for _, candidate := range candidates { + for _, plugin := range unit.AllSelectPlugins() { + plugin.OnPriorityEnd(unit, candidate) } } + + for i, candidate := range candidates { + result = append(result, HostPriority{Host: candidates[i].IndexKey(), Score: *newScore(), Candidate: candidates[i]}) + //for j := range newPriorities { + //result[i].Score += results[j][i].Score * newPriorities[j].Weight + //} + result[i].Score = unit.GetScore(candidate.IndexKey()) + } if log.V(10) { for i := range result { - log.Infof("Host %s => Score %d", result[i].Host, result[i].Score) + log.Infof("Host %s => Score %s", result[i].Host, result[i].Score.DigitString()) } } return result, nil @@ -671,7 +654,7 @@ func EqualPriority(_ *Unit, candidate Candidater) (HostPriority, error) { } return HostPriority{ Host: indexKey, - Score: 1, + Score: newZeroScore(), Candidate: candidate, }, nil } diff --git a/pkg/scheduler/core/score/score.go b/pkg/scheduler/core/score/score.go new file mode 100644 index 0000000000..86c9475eaa --- /dev/null +++ b/pkg/scheduler/core/score/score.go @@ -0,0 +1,294 @@ +package score + +import ( + "container/list" + "fmt" + "math" + //"yunion.io/x/log" +) + +type TScore int + +const ( + MinScore TScore = -1 + ZeroScore TScore = 0 + MidScore TScore = 1 + MaxScore TScore = 2 + + ZeroScoreName = "zero" +) + +type SScore struct { + Score TScore + Name string +} + +func NewScore(score TScore, name string) SScore { + return SScore{ + Score: score, + Name: name, + } +} + +func NewMinScore(name string) SScore { + return NewScore(MinScore, name) +} + +func NewZeroScore() SScore { + return NewScore(ZeroScore, ZeroScoreName) +} + +func NewMidScore(name string) SScore { + return NewScore(MidScore, name) +} + +func NewMaxScore(name string) SScore { + return NewScore(MaxScore, name) +} + +func (v SScore) GetScore() TScore { + return v.Score +} + +func (v SScore) String() string { + return fmt.Sprintf("%s: %d", v.Name, v.Score) +} + +type Scores struct { + scores *list.List +} + +func newScores() *Scores { + return &Scores{ + scores: list.New(), + } +} + +func (s *Scores) Append(scores ...SScore) *Scores { + for _, score := range scores { + s.scores.PushBack(score) + } + return s +} + +func (s *Scores) AddToFirst(score SScore) *Scores { + s.scores.PushFront(score) + return s +} + +func (s *Scores) Range(iterFunc func(ele *list.Element, score SScore) bool) { + for ele := s.scores.Front(); ele != nil; ele = ele.Next() { + cont := iterFunc(ele, ele.Value.(SScore)) + if !cont { + break + } + } +} + +func (s *Scores) SetScore(score SScore) *Scores { + exists := false + rf := func(ele *list.Element, oscore SScore) bool { + if oscore.Name == score.Name { + exists = true + oscore.Score = score.Score + ele.Value = oscore + return false + } + return true + } + s.Range(rf) + if !exists { + s.Append(score) + } + return s +} + +func (s *Scores) AddScore(score SScore) *Scores { + exists := false + rf := func(ele *list.Element, oscore SScore) bool { + if oscore.Name == score.Name { + exists = true + oscore.Score += score.Score + ele.Value = oscore + return false + } + return true + } + s.Range(rf) + if !exists { + s.Append(score) + } + return s +} + +func (s *Scores) Len() int { + return s.scores.Len() +} + +func (s *Scores) GetScores() []SScore { + ret := make([]SScore, 0) + rf := func(_ *list.Element, score SScore) bool { + ret = append(ret, score) + return true + } + s.Range(rf) + return ret +} + +type ScoreBucket struct { + scores *Scores +} + +func NewScoreBuckets() *ScoreBucket { + return &ScoreBucket{ + scores: newScores(), + } +} + +func (b *ScoreBucket) AddToFirst(score SScore) *ScoreBucket { + b.scores.AddToFirst(score) + return b +} + +func (b *ScoreBucket) Append(scores ...SScore) *ScoreBucket { + b.scores.Append(scores...) + return b +} + +func (b *ScoreBucket) GetScores() []SScore { + return b.scores.GetScores() +} + +func (b *ScoreBucket) SetScore(score SScore) *ScoreBucket { + b.scores.SetScore(score) + return b +} + +func (b *ScoreBucket) AddScore(score SScore) *ScoreBucket { + b.scores.AddScore(score) + return b +} + +func (b *ScoreBucket) GetScore(scoreName string) (int, SScore) { + for i, oscore := range b.scores.GetScores() { + if oscore.Name == scoreName { + return i, oscore + } + } + return -1, SScore{} +} + +func (b *ScoreBucket) Len() int { + return b.scores.Len() +} + +func (b *ScoreBucket) DigitString() string { + s := "" + rf := func(_ *list.Element, score SScore) bool { + s = fmt.Sprintf("%s%d", s, score.Score) + return true + } + b.scores.Range(rf) + return s +} + +func extend(scores []SScore, length int) []SScore { + olen := len(scores) + if olen >= length { + return scores + } + ret := make([]SScore, 0) + zeroDigits := length - olen + for i := 0; i < zeroDigits; i++ { + ret = append(ret, NewZeroScore()) + } + ret = append(ret, scores...) + return ret +} + +func Equal(b1, b2 *ScoreBucket) bool { + return compare(b1, b2, func(s1, s2 TScore) bool { return s1 == s2 }) +} + +func Less(b1, b2 *ScoreBucket) bool { + return compare(b1, b2, func(s1, s2 TScore) bool { return s1 < s2 }) +} + +func compare(b1, b2 *ScoreBucket, cf func(s1, s2 TScore) bool) bool { + maxLen := int(math.Max(float64(b1.Len()), float64(b2.Len()))) + s1 := b1.GetScores() + s2 := b2.GetScores() + s1 = extend(s1, maxLen) + s2 = extend(s2, maxLen) + for i := range s1 { + v1 := s1[i].GetScore() + v2 := s2[i].GetScore() + ok := cf(v1, v2) + if ok { + return true + } else if !ok { + return false + } + } + return false +} + +func (b *ScoreBucket) debugString(vals []SScore, ret string) string { + if len(vals) == 0 { + return ret + } + restVal := vals[1:] + if len(restVal) == 0 { + return vals[0].String() + } + str := b.debugString(restVal, ret) + str = fmt.Sprintf("%s, %s", vals[0].String(), str) + return str +} + +func (b *ScoreBucket) String() string { + return b.debugString(b.GetScores(), "") +} + +type Interval struct { + start int64 + end int64 +} + +func NewInterval(start, end int64) *Interval { + return &Interval{start: start, end: end} +} + +func (i Interval) IsContain(val int64) bool { + return val >= i.start && val < i.end +} + +type Intervals struct { + MinInterval *Interval + ZeroInterval *Interval + MidInterval *Interval + MaxInterval *Interval +} + +func NewIntervals(min, zero, mid int64) Intervals { + return Intervals{ + MinInterval: NewInterval(math.MinInt64, min), + ZeroInterval: NewInterval(min, zero), + MidInterval: NewInterval(zero, mid), + MaxInterval: NewInterval(mid, math.MaxInt64), + } +} + +func (is Intervals) ToScore(val int64) TScore { + for score, interval := range map[TScore]*Interval{ + MinScore: is.MinInterval, + ZeroScore: is.ZeroInterval, + MidScore: is.MidInterval, + MaxScore: is.MaxInterval, + } { + if interval != nil && interval.IsContain(val) { + return score + } + } + return ZeroScore +} diff --git a/pkg/scheduler/core/score/score_test.go b/pkg/scheduler/core/score/score_test.go new file mode 100644 index 0000000000..b67c90d258 --- /dev/null +++ b/pkg/scheduler/core/score/score_test.go @@ -0,0 +1,156 @@ +package score + +import ( + "testing" +) + +func TestScoreBucket_String(t *testing.T) { + type fields struct { + scores *Scores + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "EmptyScores", + fields: fields{newScores()}, + want: "", + }, + { + name: "Scores100", + fields: fields{newScores().Append( + NewMidScore("mid"), + NewZeroScore(), + NewZeroScore(), + )}, + want: "mid: 1, zero: 0, zero: 0", + }, + { + name: "Scores201-1", + fields: fields{newScores().Append( + NewMaxScore("max"), + NewZeroScore(), + NewMidScore("mid"), + NewMinScore("min"), + )}, + want: "max: 2, zero: 0, mid: 1, min: -1", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := &ScoreBucket{ + scores: tt.fields.scores, + } + if got := b.String(); got != tt.want { + t.Errorf("ScoreBucket.String() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestLess(t *testing.T) { + type args struct { + b1 *ScoreBucket + b2 *ScoreBucket + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "equal", + args: args{ + b1: NewScoreBuckets(), + b2: NewScoreBuckets(), + }, + want: false, + }, + { + name: "extendEqual", + args: args{ + b1: NewScoreBuckets().Append( + NewZeroScore(), NewMidScore("1"), + ), + b2: NewScoreBuckets().Append(NewMidScore("1")), + }, + want: false, + }, + { + name: "10<100", + args: args{ + b1: NewScoreBuckets().Append( + NewMidScore("1"), + NewZeroScore(), + ), + b2: NewScoreBuckets().Append( + NewMidScore("1"), + NewZeroScore(), + NewZeroScore(), + ), + }, + want: true, + }, + { + name: "101>10", + args: args{ + b1: NewScoreBuckets().Append( + NewMidScore("1"), + NewZeroScore(), + NewMidScore("1"), + ), + b2: NewScoreBuckets().Append( + NewMidScore("1"), + NewZeroScore(), + ), + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := Less(tt.args.b1, tt.args.b2); got != tt.want { + t.Errorf("Less() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestScoreBucket_DigitString(t *testing.T) { + type fields struct { + scores *Scores + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "2-101", + fields: fields{newScores().Append( + NewMaxScore(""), + NewMinScore(""), + NewZeroScore(), + NewMidScore(""), + )}, + want: "2-101", + }, + { + name: "empty", + fields: fields{newScores()}, + want: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := &ScoreBucket{ + scores: tt.fields.scores, + } + if got := b.DigitString(); got != tt.want { + t.Errorf("ScoreBucket.DigitString() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/scheduler/core/types.go b/pkg/scheduler/core/types.go index 7ca0459fc8..7deb8724b4 100644 --- a/pkg/scheduler/core/types.go +++ b/pkg/scheduler/core/types.go @@ -1,15 +1,14 @@ package core import ( - //"sync" - //"yunion.io/x/onecloud/pkg/scheduler/cache/candidate" "yunion.io/x/jsonutils" + "yunion.io/x/onecloud/pkg/scheduler/core/score" "yunion.io/x/onecloud/pkg/scheduler/db/models" ) const ( - PriorityStep int = 100 + PriorityStep int = 1 ) type FailedCandidate struct { @@ -23,7 +22,8 @@ type FailedCandidates struct { } type SelectPlugin interface { - OnSelect(*Unit, Candidater) bool + Name() string + OnPriorityEnd(*Unit, Candidater) OnSelectEnd(u *Unit, c Candidater, count int64) } @@ -58,7 +58,7 @@ type HostPriority struct { // Name of the host Host string // Score associated with the host - Score int + Score Score // Resource wraps Candidate host info Candidate Candidater } @@ -70,10 +70,10 @@ func (h HostPriorityList) Len() int { } func (h HostPriorityList) Less(i, j int) bool { - if h[i].Score == h[j].Score { + if score.Equal(h[i].Score.ScoreBucket, h[j].Score.ScoreBucket) { return h[i].Host < h[j].Host } - return h[i].Score < h[j].Score + return score.Less(h[i].Score.ScoreBucket, h[j].Score.ScoreBucket) } func (h HostPriorityList) Swap(i, j int) { @@ -115,4 +115,7 @@ type Priority interface { Map(*Unit, Candidater) (HostPriority, error) Reduce(*Unit, []Candidater, HostPriorityList) error PreExecute(*Unit, []Candidater) (bool, []PredicateFailureReason, error) + + // Score intervals + ScoreIntervals() score.Intervals } diff --git a/pkg/util/hashcache/doc.go b/pkg/util/hashcache/doc.go new file mode 100644 index 0000000000..50f6faf063 --- /dev/null +++ b/pkg/util/hashcache/doc.go @@ -0,0 +1 @@ +package hashcache // import "yunion.io/x/onecloud/pkg/util/hashcache"