mirror of
https://github.com/yunionio/cloudpods.git
synced 2026-06-07 23:09:17 +08:00
scheduler: dynamicschedtags support
This commit is contained in:
@@ -3,10 +3,13 @@ package predicates
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"yunion.io/x/jsonutils"
|
||||
"yunion.io/x/log"
|
||||
|
||||
"yunion.io/x/onecloud/pkg/scheduler/api"
|
||||
"yunion.io/x/onecloud/pkg/scheduler/core"
|
||||
"yunion.io/x/onecloud/pkg/scheduler/db/models"
|
||||
"yunion.io/x/onecloud/pkg/util/conditionparser"
|
||||
)
|
||||
|
||||
// NOTE: Aggregate Description
|
||||
@@ -42,11 +45,73 @@ func (p *AggregatePredicate) Clone() core.FitPredicate {
|
||||
}
|
||||
}
|
||||
|
||||
func hostsAggregatesInfo(cs []core.Candidater) (hostsAggregatesMap, []*models.Aggregate) {
|
||||
func getHostAndServerSchedDesc(u *core.Unit, c core.Candidater) *jsonutils.JSONDict {
|
||||
ret := jsonutils.NewDict()
|
||||
hostSchedDesc := c.GetSchedDesc()
|
||||
srvSchedDesc := jsonutils.Marshal(u.SchedData())
|
||||
ret.Add(hostSchedDesc, "host")
|
||||
ret.Add(srvSchedDesc, "server")
|
||||
return ret
|
||||
}
|
||||
|
||||
func getHostDynamicSchedtags(u *core.Unit, c core.Candidater) ([]*models.Aggregate, error) {
|
||||
schedDesc := getHostAndServerSchedDesc(u, c)
|
||||
|
||||
dynamicTags, err := models.FetchEnabledDynamicschedtags()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
aggs := []*models.Aggregate{}
|
||||
for _, tag := range dynamicTags {
|
||||
matched, err := conditionparser.Eval(tag.Condition, schedDesc)
|
||||
if err != nil {
|
||||
log.Errorf("Condition parse eval: condition: %q, desc: %s, error: %v", tag.Condition, schedDesc, err)
|
||||
continue
|
||||
}
|
||||
if !matched {
|
||||
continue
|
||||
}
|
||||
aggregate, err := tag.FetchSchedTag()
|
||||
if err != nil {
|
||||
log.Errorf("Get dynamic schedtag %q error: %v", tag.SchedtagId, err)
|
||||
continue
|
||||
}
|
||||
aggs = append(aggs, aggregate)
|
||||
}
|
||||
return aggs, nil
|
||||
}
|
||||
|
||||
func mergeHostSchedtags(c core.Candidater, staticTags, dynamicTags []*models.Aggregate) []*models.Aggregate {
|
||||
isIn := func(tags []*models.Aggregate, dt *models.Aggregate) bool {
|
||||
for _, t := range tags {
|
||||
if t.ID == dt.ID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
ret := []*models.Aggregate{}
|
||||
ret = append(ret, staticTags...)
|
||||
for _, dt := range dynamicTags {
|
||||
if !isIn(staticTags, dt) {
|
||||
ret = append(ret, dt)
|
||||
log.Debugf("Append dynamic schedtag %s to host %q", dt, c.IndexKey())
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func hostsAggregatesInfo(u *core.Unit, cs []core.Candidater) (hostsAggregatesMap, []*models.Aggregate) {
|
||||
ret := make(map[string]hostAggregates, 0)
|
||||
allAggs := make([]*models.Aggregate, 0)
|
||||
for _, c := range cs {
|
||||
hostAggs := c.GetHostAggregates()
|
||||
dynamicMatchedAggs, err := getHostDynamicSchedtags(u, c)
|
||||
if err != nil {
|
||||
log.Errorf("Get host %q dynamic schedtag error: %v", c.IndexKey(), err)
|
||||
} else {
|
||||
hostAggs = mergeHostSchedtags(c, hostAggs, dynamicMatchedAggs)
|
||||
}
|
||||
ret[c.IndexKey()] = hostAggs
|
||||
}
|
||||
if len(cs) > 0 {
|
||||
@@ -62,7 +127,7 @@ func (p *AggregatePredicate) PreExecute(u *core.Unit, cs []core.Candidater) (boo
|
||||
return false, nil
|
||||
}
|
||||
|
||||
hsMap, allAggs := hostsAggregatesInfo(cs)
|
||||
hsMap, allAggs := hostsAggregatesInfo(u, cs)
|
||||
p.AggregateHosts = hsMap
|
||||
appendedAggIds := make(map[string]int, len(data.Aggregates))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user