mirror of
https://github.com/yunionio/cloudpods.git
synced 2026-07-01 02:24:47 +08:00
fix(scheduler,region): fill resource info check guest is in pending usage (#21675)
This commit is contained in:
@@ -90,6 +90,9 @@ type ScheduleInput struct {
|
||||
// For Migrate
|
||||
CpuNumaPin []SCpuNumaPin `json:"cpu_numa_pin"`
|
||||
|
||||
// GuestIds
|
||||
GuestIds []string `json:"guest_ids"`
|
||||
|
||||
HostMemPageSizeKB int `json:"host_mem_page_size"`
|
||||
SkipKernelCheck *bool `json:"skip_kernel_check"`
|
||||
TargetHostKernel string `json:"target_host_kernel"`
|
||||
|
||||
@@ -141,6 +141,12 @@ func doScheduleObjects(
|
||||
return
|
||||
}
|
||||
|
||||
sort.Sort(sortedIScheduleModelList(objs))
|
||||
schedInput.GuestIds = make([]string, len(objs))
|
||||
for i := range objs {
|
||||
schedInput.GuestIds[i] = objs[i].GetId()
|
||||
}
|
||||
|
||||
output, err := doScheduleWithInput(ctx, task, schedInput, len(objs))
|
||||
if err != nil {
|
||||
onSchedulerRequestFail(ctx, task, objs, jsonutils.NewString(err.Error()))
|
||||
@@ -202,7 +208,6 @@ func onSchedulerResults(
|
||||
task.SaveScheduleResult(ctx, nil, results[0], 0)
|
||||
return
|
||||
}
|
||||
sort.Sort(sortedIScheduleModelList(objs))
|
||||
succCount := 0
|
||||
for idx := 0; idx < len(objs); idx += 1 {
|
||||
obj := objs[idx]
|
||||
|
||||
41
pkg/scheduler/cache/candidate/hosts.go
vendored
41
pkg/scheduler/cache/candidate/hosts.go
vendored
@@ -1343,6 +1343,7 @@ func (b *HostBuilder) fillGuestsResourceInfo(desc *HostDesc, host *computemodels
|
||||
guestsOnHost = append(guestsOnHost, backupGuestsOnHost...)
|
||||
}
|
||||
|
||||
pendingUsage := desc.GetPendingUsage()
|
||||
desc.Tenants = make(map[string]int64)
|
||||
for _, gst := range guestsOnHost {
|
||||
guest := gst.(computemodels.SGuest)
|
||||
@@ -1352,25 +1353,33 @@ func (b *HostBuilder) fillGuestsResourceInfo(desc *HostDesc, host *computemodels
|
||||
} else {
|
||||
desc.Tenants[projectId] = 1
|
||||
}
|
||||
if IsGuestRunning(guest) {
|
||||
runningCount++
|
||||
memSize += int64(guest.VmemSize)
|
||||
cpuCount += int64(guest.VcpuCount)
|
||||
if guest.CpuNumaPin != nil {
|
||||
cpuNumaPin := make([]scheduler.SCpuNumaPin, 0)
|
||||
if err := guest.CpuNumaPin.Unmarshal(&cpuNumaPin); err != nil {
|
||||
return errors.Wrap(err, "unmarshal cpu numa pin")
|
||||
}
|
||||
guestsCpuNumaPin = append(guestsCpuNumaPin, cpuNumaPin...)
|
||||
}
|
||||
} else if IsGuestCreating(guest) {
|
||||
creatingGuestCount++
|
||||
creatingMemSize += int64(guest.VmemSize)
|
||||
creatingCPUCount += int64(guest.VcpuCount)
|
||||
} else if IsGuestPendingDelete(guest) {
|
||||
|
||||
if IsGuestPendingDelete(guest) {
|
||||
memFakeDeletedSize += int64(guest.VmemSize)
|
||||
cpuFakeDeletedCount += int64(guest.VcpuCount)
|
||||
} else {
|
||||
if _, ok := pendingUsage.PendingGuestIds[guest.Id]; ok {
|
||||
log.Infof("fillGuestsResourceInfo guest %s in pending usage", guest.Id)
|
||||
continue
|
||||
}
|
||||
if IsGuestRunning(guest) {
|
||||
runningCount++
|
||||
memSize += int64(guest.VmemSize)
|
||||
cpuCount += int64(guest.VcpuCount)
|
||||
if guest.CpuNumaPin != nil {
|
||||
cpuNumaPin := make([]scheduler.SCpuNumaPin, 0)
|
||||
if err := guest.CpuNumaPin.Unmarshal(&cpuNumaPin); err != nil {
|
||||
return errors.Wrap(err, "unmarshal cpu numa pin")
|
||||
}
|
||||
guestsCpuNumaPin = append(guestsCpuNumaPin, cpuNumaPin...)
|
||||
}
|
||||
} else if IsGuestCreating(guest) {
|
||||
creatingGuestCount++
|
||||
creatingMemSize += int64(guest.VmemSize)
|
||||
creatingCPUCount += int64(guest.VcpuCount)
|
||||
}
|
||||
}
|
||||
|
||||
guestCount++
|
||||
cpuReqCount += int64(guest.VcpuCount)
|
||||
memReqSize += int64(guest.VmemSize)
|
||||
|
||||
@@ -132,8 +132,16 @@ func setSchedPendingUsage(driver computemodels.IGuestDriver, req *api.SchedInfo,
|
||||
if req.IsSuggestion || IsDriverSkipScheduleDirtyMark(driver) {
|
||||
return nil
|
||||
}
|
||||
for _, item := range resp.Candidates {
|
||||
schedmodels.HostPendingUsageManager.AddPendingUsage(req, item)
|
||||
for i, item := range resp.Candidates {
|
||||
if item.Error != "" {
|
||||
// schedule failed skip add pending usage
|
||||
continue
|
||||
}
|
||||
var guestId string
|
||||
if len(req.GuestIds) > i {
|
||||
guestId = req.GuestIds[i]
|
||||
}
|
||||
schedmodels.HostPendingUsageManager.AddPendingUsage(guestId, req, item)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -50,15 +50,11 @@ func (m *SHostPendingUsageManager) Keyword() string {
|
||||
}
|
||||
|
||||
func (m *SHostPendingUsageManager) newSessionUsage(req *api.SchedInfo, hostId string, candidate *schedapi.CandidateResource) *SessionPendingUsage {
|
||||
su := NewSessionUsage(req.SessionId, hostId)
|
||||
su.Usage = NewPendingUsageBySchedInfo(hostId, req, candidate)
|
||||
usage := NewPendingUsageBySchedInfo(hostId, req, candidate)
|
||||
su := NewSessionUsage(req.SessionId, hostId, usage)
|
||||
return su
|
||||
}
|
||||
|
||||
func (m *SHostPendingUsageManager) newPendingUsage(hostId string) *SPendingUsage {
|
||||
return NewPendingUsageBySchedInfo(hostId, nil, nil)
|
||||
}
|
||||
|
||||
func (m *SHostPendingUsageManager) GetPendingUsage(hostId string) (*SPendingUsage, error) {
|
||||
return m.getPendingUsage(hostId)
|
||||
}
|
||||
@@ -75,7 +71,7 @@ func (m *SHostPendingUsageManager) GetSessionUsage(sessionId, hostId string) (*S
|
||||
return m.store.GetSessionUsage(sessionId, hostId)
|
||||
}
|
||||
|
||||
func (m *SHostPendingUsageManager) AddPendingUsage(req *api.SchedInfo, candidate *schedapi.CandidateResource) {
|
||||
func (m *SHostPendingUsageManager) AddPendingUsage(guestId string, req *api.SchedInfo, candidate *schedapi.CandidateResource) {
|
||||
hostId := candidate.HostId
|
||||
|
||||
sessionUsage, _ := m.GetSessionUsage(req.SessionId, hostId)
|
||||
@@ -83,25 +79,25 @@ func (m *SHostPendingUsageManager) AddPendingUsage(req *api.SchedInfo, candidate
|
||||
sessionUsage = m.newSessionUsage(req, hostId, candidate)
|
||||
sessionUsage.StartTimer()
|
||||
}
|
||||
m.addSessionUsage(candidate.HostId, sessionUsage)
|
||||
m.addSessionUsage(candidate.HostId, guestId, sessionUsage)
|
||||
if candidate.BackupCandidate != nil {
|
||||
m.AddPendingUsage(req, candidate.BackupCandidate)
|
||||
m.AddPendingUsage(guestId, req, candidate.BackupCandidate)
|
||||
}
|
||||
}
|
||||
|
||||
// addSessionUsage add pending usage and session usage
|
||||
func (m *SHostPendingUsageManager) addSessionUsage(hostId string, usage *SessionPendingUsage) {
|
||||
func (m *SHostPendingUsageManager) addSessionUsage(hostId, guestId string, usage *SessionPendingUsage) {
|
||||
ctx := context.Background()
|
||||
lockman.LockClass(ctx, m, hostId)
|
||||
defer lockman.ReleaseClass(ctx, m, hostId)
|
||||
|
||||
pendingUsage, _ := m.getPendingUsage(hostId)
|
||||
if pendingUsage == nil {
|
||||
pendingUsage = m.newPendingUsage(hostId)
|
||||
pendingUsage = NewPendingUsageBySchedInfo(hostId, nil, nil)
|
||||
}
|
||||
// add pending usage
|
||||
pendingUsage.Add(usage.Usage)
|
||||
usage.AddCount()
|
||||
pendingUsage.Add(usage.Usage, guestId)
|
||||
usage.AddCount(guestId)
|
||||
m.store.SetSessionUsage(usage.SessionId, hostId, usage)
|
||||
m.store.SetPendingUsage(hostId, pendingUsage)
|
||||
}
|
||||
@@ -186,11 +182,11 @@ type SessionPendingUsage struct {
|
||||
cancelCh chan string
|
||||
}
|
||||
|
||||
func NewSessionUsage(sid, hostId string) *SessionPendingUsage {
|
||||
func NewSessionUsage(sid, hostId string, usage *SPendingUsage) *SessionPendingUsage {
|
||||
su := &SessionPendingUsage{
|
||||
HostId: hostId,
|
||||
SessionId: sid,
|
||||
Usage: NewPendingUsageBySchedInfo(hostId, nil, nil),
|
||||
Usage: usage,
|
||||
count: 0,
|
||||
countLock: new(sync.Mutex),
|
||||
cancelCh: make(chan string),
|
||||
@@ -202,16 +198,21 @@ func (su *SessionPendingUsage) GetHostId() string {
|
||||
return su.Usage.HostId
|
||||
}
|
||||
|
||||
func (su *SessionPendingUsage) AddCount() {
|
||||
func (su *SessionPendingUsage) AddCount(guestId string) {
|
||||
su.countLock.Lock()
|
||||
defer su.countLock.Unlock()
|
||||
su.count++
|
||||
su.Usage.PendingGuestIds[guestId] = struct{}{}
|
||||
}
|
||||
|
||||
func (su *SessionPendingUsage) SubCount() {
|
||||
su.countLock.Lock()
|
||||
defer su.countLock.Unlock()
|
||||
su.count--
|
||||
for guestId, _ := range su.Usage.PendingGuestIds {
|
||||
delete(su.Usage.PendingGuestIds, guestId)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
type SResourcePendingUsage struct {
|
||||
@@ -295,6 +296,8 @@ type SPendingUsage struct {
|
||||
CpuPin map[int]int
|
||||
Memory int
|
||||
|
||||
PendingGuestIds map[string]struct{}
|
||||
|
||||
// nodeId: memSizeMB
|
||||
NumaMemPin map[int]int
|
||||
IsolatedDevice int
|
||||
@@ -306,9 +309,10 @@ type SPendingUsage struct {
|
||||
|
||||
func NewPendingUsageBySchedInfo(hostId string, req *api.SchedInfo, candidate *schedapi.CandidateResource) *SPendingUsage {
|
||||
u := &SPendingUsage{
|
||||
HostId: hostId,
|
||||
DiskUsage: NewResourcePendingUsage(nil),
|
||||
NetUsage: NewResourcePendingUsage(nil),
|
||||
HostId: hostId,
|
||||
DiskUsage: NewResourcePendingUsage(nil),
|
||||
NetUsage: NewResourcePendingUsage(nil),
|
||||
PendingGuestIds: make(map[string]struct{}),
|
||||
}
|
||||
|
||||
// group init
|
||||
@@ -385,7 +389,7 @@ func (self *SPendingUsage) ToMap() map[string]interface{} {
|
||||
}
|
||||
}
|
||||
|
||||
func (self *SPendingUsage) Add(sUsage *SPendingUsage) {
|
||||
func (self *SPendingUsage) Add(sUsage *SPendingUsage, addGuestId string) {
|
||||
self.Cpu = self.Cpu + sUsage.Cpu
|
||||
for k, v1 := range sUsage.CpuPin {
|
||||
if v2, ok := self.CpuPin[k]; ok {
|
||||
@@ -395,6 +399,17 @@ func (self *SPendingUsage) Add(sUsage *SPendingUsage) {
|
||||
}
|
||||
}
|
||||
|
||||
for guestId := range sUsage.PendingGuestIds {
|
||||
if _, ok := self.PendingGuestIds[guestId]; !ok {
|
||||
log.Infof("add guest %s in pending usage", guestId)
|
||||
self.PendingGuestIds[guestId] = struct{}{}
|
||||
}
|
||||
}
|
||||
if addGuestId != "" {
|
||||
log.Infof("add guest %s in pending usage", addGuestId)
|
||||
self.PendingGuestIds[addGuestId] = struct{}{}
|
||||
}
|
||||
|
||||
self.Memory = self.Memory + sUsage.Memory
|
||||
for k, v1 := range sUsage.NumaMemPin {
|
||||
if v2, ok := self.NumaMemPin[k]; ok {
|
||||
@@ -423,6 +438,11 @@ func (self *SPendingUsage) Sub(sUsage *SPendingUsage) {
|
||||
}
|
||||
}
|
||||
|
||||
for guestId := range sUsage.PendingGuestIds {
|
||||
log.Infof("delete guest %s in pending usage", guestId)
|
||||
delete(self.PendingGuestIds, guestId)
|
||||
}
|
||||
|
||||
self.Memory = quotas.NonNegative(self.Memory - sUsage.Memory)
|
||||
for k, v1 := range sUsage.NumaMemPin {
|
||||
if v2, ok := self.NumaMemPin[k]; ok {
|
||||
|
||||
Reference in New Issue
Block a user