Files
cloudpods/pkg/scheduler/algorithm/predicates/predicates.go
2022-04-05 11:29:30 +08:00

653 lines
19 KiB
Go

// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package predicates
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/gotypes"
"yunion.io/x/pkg/util/errors"
computeapi "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/compute/models"
"yunion.io/x/onecloud/pkg/scheduler/algorithm/plugin"
"yunion.io/x/onecloud/pkg/scheduler/api"
"yunion.io/x/onecloud/pkg/scheduler/core"
"yunion.io/x/onecloud/pkg/scheduler/core/score"
)
// BasePredicate is a default struct for all the predicates that will
// include it and implement it's Name() and PreExecute() methods.
type BasePredicate struct{}
func (b *BasePredicate) Name() string {
return "base_predicate_should_not_be_called"
}
func (b *BasePredicate) PreExecute(ctx context.Context, unit *core.Unit, candis []core.Candidater) (bool, error) {
return true, nil
}
func (b *BasePredicate) GetHypervisorDriver(u *core.Unit) models.IGuestDriver {
return models.GetDriver(u.GetHypervisor())
}
type PredicateHelper struct {
predicate core.FitPredicate
predicateFails []core.PredicateFailureReason
capacity int64
Unit *core.Unit
Candidate core.Candidater
}
func (h *PredicateHelper) getResult() (bool, []core.PredicateFailureReason, error) {
if len(h.predicateFails) > 0 {
return false, h.predicateFails, nil
}
if h.capacity == 0 {
return false, []core.PredicateFailureReason{}, nil
}
return true, nil, nil
}
func (h *PredicateHelper) GetResult() (bool, []core.PredicateFailureReason, error) {
ok, reasons, err := h.getResult()
if !ok {
log.Warningf("[Filter Result] candidate: %q, filter: %q, is_ok: %v, reason: %q, error: %v\n",
h.Candidate.IndexKey(), h.predicate.Name(), ok, getReasonsString(reasons), err)
}
return ok, reasons, err
}
func getReasonsString(reasons []core.PredicateFailureReason) string {
if len(reasons) == 0 {
return ""
}
ss := make([]string, 0, len(reasons))
for _, reason := range reasons {
ss = append(ss, reason.GetReason())
}
return strings.Join(ss, ", ")
}
func NewPredicateHelper(pre core.FitPredicate, unit *core.Unit, candi core.Candidater) *PredicateHelper {
h := &PredicateHelper{
predicate: pre,
capacity: core.EmptyCapacity,
predicateFails: []core.PredicateFailureReason{},
Unit: unit,
Candidate: candi,
}
return h
}
func (h *PredicateHelper) GetFailedResult(err error) (bool, []core.PredicateFailureReason, error) {
return false, nil, err
}
func (h *PredicateHelper) AppendPredicateFail(reason core.PredicateFailureReason) {
h.predicateFails = append(h.predicateFails, reason)
}
type predicateFailure struct {
err core.PredicateFailureError
eType string
}
func (f predicateFailure) GetReason() string {
return f.err.GetReason()
}
func (f predicateFailure) GetType() string {
return f.eType
}
func (h *PredicateHelper) AppendPredicateFailMsg(reason string) {
h.AppendPredicateFailMsgWithType(reason, h.predicate.Name())
}
func (h *PredicateHelper) AppendPredicateFailMsgWithType(reason string, eType string) {
err := NewUnexceptedResourceError(reason)
h.AppendPredicateFail(&predicateFailure{err: err, eType: eType})
}
func (h *PredicateHelper) AppendInsufficientResourceError(req, total, free int64) {
h.AppendPredicateFail(
&predicateFailure{
err: NewInsufficientResourceError(h.Candidate.Getter().Name(), req, total, free),
eType: h.predicate.Name(),
})
}
// SetCapacity returns the current resource capacity calculated by a filter.
// And 'capacity' default is -1.
func (h *PredicateHelper) SetCapacity(capacity int64) {
if capacity < 0 {
capacity = 0
}
h.SetCapacityCounter(core.NewNormalCounter(capacity))
}
func (h *PredicateHelper) SetCapacityCounter(counter core.Counter) {
capacity := counter.GetCount()
if capacity < core.EmptyCapacity {
capacity = core.EmptyCapacity
}
h.capacity = capacity
h.Unit.SetCapacity(h.Candidate.IndexKey(), h.predicate.Name(), counter)
}
func (h *PredicateHelper) SetSelectPriority(sp int) {
if sp < 0 {
sp = 0
}
h.Unit.SetSelectPriorityWithLock(h.Candidate.IndexKey(), h.predicate.Name(), core.SSelectPriorityValue(sp))
}
func (h *PredicateHelper) Exclude(reason string) {
h.SetCapacity(0)
h.AppendPredicateFailMsg(reason)
}
func (h *PredicateHelper) ExcludeByErrors(errs []core.PredicateFailureReason) {
h.SetCapacity(0)
for _, err := range errs {
h.AppendPredicateFail(err)
}
}
func (h *PredicateHelper) Exclude2(predicateName string, current, expected interface{}) {
h.Exclude(fmt.Sprintf("%s is '%v', expected '%v'", predicateName, current, expected))
}
// UseReserved check whether the unit can use guest reserved resource
func (h *PredicateHelper) UseReserved() bool {
usable := false
data := h.Unit.SchedData()
isoDevs := data.IsolatedDevices
if len(isoDevs) > 0 || (data.ChangeConfig && data.HasIsolatedDevice) {
usable = true
}
return usable
}
type PredicatedSchedtagResource struct {
ISchedtagCandidateResource
PreferTags []computeapi.SchedtagConfig
AvoidTags []computeapi.SchedtagConfig
}
type SchedtagInputResourcesMap map[int][]*PredicatedSchedtagResource
func (m SchedtagInputResourcesMap) getAllTags(isPrefer bool) []computeapi.SchedtagConfig {
ret := make([]computeapi.SchedtagConfig, 0)
for _, ss := range m {
for _, s := range ss {
var tags []computeapi.SchedtagConfig
if isPrefer {
tags = s.PreferTags
} else {
tags = s.AvoidTags
}
ret = append(ret, tags...)
}
}
return ret
}
func (m SchedtagInputResourcesMap) GetPreferTags() []computeapi.SchedtagConfig {
return m.getAllTags(true)
}
func (m SchedtagInputResourcesMap) GetAvoidTags() []computeapi.SchedtagConfig {
return m.getAllTags(false)
}
type CandidateInputResourcesMap struct {
*sync.Map // map[string]SchedtagInputResourcesMap
}
type ISchedtagCandidateResource interface {
GetName() string
GetId() string
Keyword() string
GetSchedtags() []models.SSchedtag
GetSchedtagJointManager() models.ISchedtagJointManager
GetDynamicConditionInput() *jsonutils.JSONDict
}
type ISchedtagPredicateInstance interface {
core.FitPredicate
OnPriorityEnd(u *core.Unit, c core.Candidater)
OnSelectEnd(u *core.Unit, c core.Candidater, count int64)
GetInputs(u *core.Unit) []ISchedtagCustomer
GetResources(c core.Candidater) []ISchedtagCandidateResource
IsResourceMatchInput(ctx context.Context, input ISchedtagCustomer, res ISchedtagCandidateResource) bool
IsResourceFitInput(ctx context.Context, unit *core.Unit, c core.Candidater, res ISchedtagCandidateResource, input ISchedtagCustomer) core.PredicateFailureReason
DoSelect(c core.Candidater, input ISchedtagCustomer, res []ISchedtagCandidateResource) []ISchedtagCandidateResource
AddSelectResult(index int, input ISchedtagCustomer, selectRes []ISchedtagCandidateResource, output *core.AllocatedResource)
GetCandidateResourceSortScore(candidate ISchedtagCandidateResource) int64
}
// Schedtag Description
// require: Must be scheduled to the specified tag resource
// prefer: Priority to the specified resource
// avoid: Try to avoid scheduling to the specified resource
// exclude: Do not allow scheduling on the specified resource
type BaseSchedtagPredicate struct {
BasePredicate
plugin.BasePlugin
CandidateInputResources *CandidateInputResourcesMap
Hypervisor string
}
func NewBaseSchedtagPredicate() *BaseSchedtagPredicate {
return &BaseSchedtagPredicate{
CandidateInputResources: &CandidateInputResourcesMap{Map: new(sync.Map)}, // make(map[string]SchedtagInputResourcesMap),
}
}
func (p *PredicatedSchedtagResource) isNoTag() bool {
return len(p.PreferTags) == 0 && len(p.AvoidTags) == 0
}
func (p *PredicatedSchedtagResource) hasPreferTags() bool {
return len(p.PreferTags) != 0
}
func (p *PredicatedSchedtagResource) hasAvoidTags() bool {
return len(p.AvoidTags) != 0
}
type ISchedtagCustomer interface {
// JSON(interface{}) *jsonutils.JSONDict
GetDynamicConditionInput() *jsonutils.JSONDict
Keyword() string
IsSpecifyResource() bool
GetSchedtags() []*computeapi.SchedtagConfig
ResourceKeyword() string
}
type SchedtagResourceW struct {
candidater ISchedtagCandidateResource
input ISchedtagCustomer
}
func (w SchedtagResourceW) IndexKey() string {
return fmt.Sprintf("%s:%s", w.candidater.GetName(), w.candidater.GetId())
}
func (w SchedtagResourceW) ResourceType() string {
return getSchedtagResourceType(w.candidater)
}
func getSchedtagResourceType(candidater ISchedtagCandidateResource) string {
return candidater.GetSchedtagJointManager().GetMasterManager().KeywordPlural()
}
func (w SchedtagResourceW) GetSchedtags() []models.SSchedtag {
return w.candidater.GetSchedtags()
}
func (w SchedtagResourceW) GetDynamicSchedDesc() *jsonutils.JSONDict {
ret := jsonutils.NewDict()
resSchedDesc := w.candidater.GetDynamicConditionInput()
inputSchedDesc := w.input.GetDynamicConditionInput()
ret.Add(resSchedDesc, w.candidater.Keyword())
ret.Add(inputSchedDesc, w.input.Keyword())
return ret
}
func (p *BaseSchedtagPredicate) GetHypervisorDriver() models.IGuestDriver {
return models.GetDriver(p.Hypervisor)
}
func (p *BaseSchedtagPredicate) check(input ISchedtagCustomer, candidate ISchedtagCandidateResource, u *core.Unit, c core.Candidater) (*PredicatedSchedtagResource, error) {
allTags, err := GetAllSchedtags(getSchedtagResourceType(candidate))
if err != nil {
return nil, err
}
tagPredicate := NewSchedtagPredicate(input.GetSchedtags(), allTags)
res := &PredicatedSchedtagResource{
ISchedtagCandidateResource: candidate,
}
if !input.IsSpecifyResource() {
if err := tagPredicate.Check(
SchedtagResourceW{
candidater: candidate,
input: input,
},
); err != nil {
return nil, err
}
res.PreferTags = tagPredicate.GetPreferTags()
res.AvoidTags = tagPredicate.GetAvoidTags()
}
return res, nil
}
func (p *BaseSchedtagPredicate) checkResources(input ISchedtagCustomer, ress []ISchedtagCandidateResource, u *core.Unit, c core.Candidater) ([]*PredicatedSchedtagResource, error) {
errs := make([]error, 0)
ret := make([]*PredicatedSchedtagResource, 0)
for _, res := range ress {
ps, err := p.check(input, res, u, c)
if err != nil {
// append err, resource not suit input customer
errs = append(errs, err)
continue
}
ret = append(ret, ps)
}
if len(ret) == 0 {
return nil, errors.NewAggregate(errs)
}
return ret, nil
}
func (p *BaseSchedtagPredicate) GetInputResourcesMap(candidateId string) SchedtagInputResourcesMap {
ret, ok := p.CandidateInputResources.Load(candidateId)
if !ok {
ret = make(map[int][]*PredicatedSchedtagResource)
p.CandidateInputResources.Store(candidateId, ret)
}
return ret.(map[int][]*PredicatedSchedtagResource)
}
func (p *BaseSchedtagPredicate) PreExecute(ctx context.Context, sp ISchedtagPredicateInstance, u *core.Unit, cs []core.Candidater) (bool, error) {
input := sp.GetInputs(u)
if len(input) == 0 {
return false, nil
}
p.Hypervisor = u.GetHypervisor()
// always do select step
u.AppendSelectPlugin(sp)
return true, nil
}
func (p *BaseSchedtagPredicate) Execute(
ctx context.Context,
sp ISchedtagPredicateInstance,
u *core.Unit,
c core.Candidater,
) (bool, []core.PredicateFailureReason, error) {
inputs := sp.GetInputs(u)
resources := sp.GetResources(c)
h := NewPredicateHelper(sp, u, c)
inputRes := p.GetInputResourcesMap(c.IndexKey())
filterErrs := make([]core.PredicateFailureReason, 0)
for idx, input := range inputs {
fitResources := make([]ISchedtagCandidateResource, 0)
errs := make([]core.PredicateFailureReason, 0)
matchedRes := make([]ISchedtagCandidateResource, 0)
for _, r := range resources {
if sp.IsResourceMatchInput(ctx, input, r) {
matchedRes = append(matchedRes, r)
}
}
if len(matchedRes) == 0 {
errs = append(errs, &FailReason{
Reason: fmt.Sprintf("Not found matched %s, candidate: %s, %s: %s", input.ResourceKeyword(), c.Getter().Name(), input.Keyword(), input.GetDynamicConditionInput()),
Type: fmt.Sprintf("%s_match", input.ResourceKeyword()),
})
}
for _, res := range matchedRes {
if err := sp.IsResourceFitInput(ctx, u, c, res, input); err == nil {
fitResources = append(fitResources, res)
} else {
errs = append(errs, err)
}
}
if len(fitResources) == 0 {
h.ExcludeByErrors(errs)
break
}
if len(errs) > 0 {
filterErrs = append(filterErrs, errs...)
}
matchedResources, err := p.checkResources(input, fitResources, u, c)
if err != nil {
if len(filterErrs) > 0 {
h.ExcludeByErrors(filterErrs)
}
errMsg := fmt.Sprintf("schedtag: %v", err.Error())
h.Exclude(errMsg)
}
inputRes[idx] = matchedResources
}
return h.GetResult()
}
func SetCandidateScoreBySchedtag(u *core.Unit, c core.Candidater, aggCountMap map[string]int, prefer bool) {
stepScore := core.PriorityStep
doSet := u.SetPreferScore
if !prefer {
doSet = u.SetAvoidScore
}
for n, count := range aggCountMap {
doSet(c.IndexKey(), score.NewScore(score.TScore(count*stepScore), n))
}
}
func (p *BaseSchedtagPredicate) OnPriorityEnd(sp ISchedtagPredicateInstance, u *core.Unit, c core.Candidater) {
resTags := []models.SSchedtag{}
for _, res := range sp.GetResources(c) {
resTags = append(resTags, res.GetSchedtags()...)
}
inputRes := p.GetInputResourcesMap(c.IndexKey())
avoidTags := inputRes.GetAvoidTags()
preferTags := inputRes.GetPreferTags()
avoidCountMap := GetSchedtagCount(avoidTags, resTags, api.AggregateStrategyAvoid)
preferCountMap := GetSchedtagCount(preferTags, resTags, api.AggregateStrategyPrefer)
setScore := SetCandidateScoreBySchedtag
setScore(u, c, preferCountMap, true)
setScore(u, c, avoidCountMap, false)
}
func (p *BaseSchedtagPredicate) OnSelectEnd(sp ISchedtagPredicateInstance, u *core.Unit, c core.Candidater, count int64) {
inputRes := p.GetInputResourcesMap(c.IndexKey())
output := u.GetAllocatedResource(c.IndexKey())
inputs := sp.GetInputs(u)
idxKeys := []int{}
// inputRes is unorder map, sorted it
for idx := range inputRes {
idxKeys = append(idxKeys, idx)
}
sort.Ints(idxKeys)
for idx := range idxKeys {
res := inputRes[idx]
selRes := p.selectResource(sp, c, inputs[idx], res)
sortRes := newSortCandidateResource(sp, selRes)
sort.Sort(sortRes)
sp.AddSelectResult(idx, inputs[idx], sortRes.res, output)
}
}
type sortCandidateResource struct {
predicate ISchedtagPredicateInstance
res []ISchedtagCandidateResource
}
func newSortCandidateResource(predicate ISchedtagPredicateInstance, res []ISchedtagCandidateResource) *sortCandidateResource {
return &sortCandidateResource{
predicate: predicate,
res: res,
}
}
func (s *sortCandidateResource) Len() int {
return len(s.res)
}
func (s *sortCandidateResource) DebugString() string {
var debugStr string
for _, i := range s.res {
debugStr = fmt.Sprintf("%s %d", debugStr, s.predicate.GetCandidateResourceSortScore(i))
}
return debugStr
}
// desc order
func (s *sortCandidateResource) Less(i, j int) bool {
res1, res2 := s.res[i], s.res[j]
v1 := s.predicate.GetCandidateResourceSortScore(res1)
v2 := s.predicate.GetCandidateResourceSortScore(res2)
return v1 > v2
}
func (s *sortCandidateResource) Swap(i, j int) {
s.res[i], s.res[j] = s.res[j], s.res[i]
}
func (p *BaseSchedtagPredicate) selectResource(
sp ISchedtagPredicateInstance,
c core.Candidater,
input ISchedtagCustomer,
ress []*PredicatedSchedtagResource,
) []ISchedtagCandidateResource {
preferRes := make([]ISchedtagCandidateResource, 0)
noTagRes := make([]ISchedtagCandidateResource, 0)
avoidRes := make([]ISchedtagCandidateResource, 0)
for _, res := range ress {
if res.isNoTag() {
noTagRes = append(noTagRes, res.ISchedtagCandidateResource)
} else if res.hasPreferTags() {
preferRes = append(preferRes, res.ISchedtagCandidateResource)
} else if res.hasAvoidTags() {
avoidRes = append(avoidRes, res.ISchedtagCandidateResource)
}
}
for _, ress := range [][]ISchedtagCandidateResource{
preferRes,
noTagRes,
avoidRes,
} {
if len(ress) == 0 {
continue
}
if ret := sp.DoSelect(c, input, ress); ret != nil {
return ret
}
}
return nil
}
type iServerBaseSchedtagPredicate interface {
ISchedtagPredicateInstance
GetCandidateResource(core.Candidater) ISchedtagCandidateResource
}
type ServerBaseSchedtagPredicate struct {
*BaseSchedtagPredicate
filter iServerBaseSchedtagPredicate
}
func NewServerBaseSchedtagPredicate(filter iServerBaseSchedtagPredicate) *ServerBaseSchedtagPredicate {
return &ServerBaseSchedtagPredicate{
BaseSchedtagPredicate: NewBaseSchedtagPredicate(),
filter: filter,
}
}
func (p *ServerBaseSchedtagPredicate) PreExecute(ctx context.Context, u *core.Unit, cs []core.Candidater) (bool, error) {
return p.BaseSchedtagPredicate.PreExecute(ctx, p.filter, u, cs)
}
func (p *ServerBaseSchedtagPredicate) Execute(ctx context.Context, u *core.Unit, c core.Candidater) (bool, []core.PredicateFailureReason, error) {
return p.BaseSchedtagPredicate.Execute(ctx, p.filter, u, c)
}
func (p *ServerBaseSchedtagPredicate) GetResources(c core.Candidater) []ISchedtagCandidateResource {
res := p.filter.GetCandidateResource(c)
if res == nil || gotypes.IsNil(res) {
return nil
}
return []ISchedtagCandidateResource{
res,
}
}
func (p *ServerBaseSchedtagPredicate) IsResourceMatchInput(ctx context.Context, input ISchedtagCustomer, res ISchedtagCandidateResource) bool {
return true
}
func (p *ServerBaseSchedtagPredicate) IsResourceFitInput(ctx context.Context, u *core.Unit, c core.Candidater, res ISchedtagCandidateResource, input ISchedtagCustomer) core.PredicateFailureReason {
return nil
}
func (p *ServerBaseSchedtagPredicate) DoSelect(
c core.Candidater,
input ISchedtagCustomer,
res []ISchedtagCandidateResource,
) []ISchedtagCandidateResource {
return res
}
func (p *ServerBaseSchedtagPredicate) AddSelectResult(index int, input ISchedtagCustomer, selectRes []ISchedtagCandidateResource, output *core.AllocatedResource) {
// resource is host, do nothing
}
func (p *ServerBaseSchedtagPredicate) GetCandidateResourceSortScore(selectRes ISchedtagCandidateResource) int64 {
// TODO
return 1
}
func (p *ServerBaseSchedtagPredicate) OnPriorityEnd(u *core.Unit, c core.Candidater) {
p.BaseSchedtagPredicate.OnPriorityEnd(p.filter, u, c)
}
func (p *ServerBaseSchedtagPredicate) OnSelectEnd(u *core.Unit, c core.Candidater, count int64) {
p.BaseSchedtagPredicate.OnSelectEnd(p.filter, u, c, count)
}