From 4d8e0ba1b4cf98303c2beb94de151471efb0148b Mon Sep 17 00:00:00 2001 From: Rain Date: Sun, 26 Apr 2020 14:06:50 +0800 Subject: [PATCH] fix/autoscaling: fix some problem MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. ScalingGroupGuestManger.Query 默认添加 NotEqual(...) 2. ScalingGroup.Scale 扩大锁的粒度,更新冷却时间也挪到这里面 3. climc 中增加 scaling-group-show --- cmd/climc/shell/scaling_group.go | 14 +++++++++++ pkg/compute/models/guests.go | 2 +- pkg/compute/models/scaling_group.go | 25 +++++++++++-------- pkg/compute/models/scaling_policy.go | 5 +--- pkg/compute/models/scalinggroup_guest.go | 8 +++++- .../tasks/guest_detach_scalinggroup.go | 2 +- pkg/controller/autoscaling/controller.go | 17 ++++++++++--- 7 files changed, 52 insertions(+), 21 deletions(-) diff --git a/cmd/climc/shell/scaling_group.go b/cmd/climc/shell/scaling_group.go index 01afab3d3b..354015e0a5 100644 --- a/cmd/climc/shell/scaling_group.go +++ b/cmd/climc/shell/scaling_group.go @@ -41,6 +41,20 @@ func init() { printList(result, modules.ScalingGroup.GetColumns(s)) return nil }) + + type ScalingGroupShowOptions struct { + ID string + } + R(&ScalingGroupShowOptions{}, "scaling-group-show", "Show scaling group", func(s *mcclient.ClientSession, + args *ScalingGroupShowOptions) error { + result, err := modules.ScalingGroup.Get(s, args.ID, nil) + if err != nil { + return err + } + printObject(result) + return nil + }) + type ScalingGroupCreateOptions struct { NAME string diff --git a/pkg/compute/models/guests.go b/pkg/compute/models/guests.go index cbae926338..988b514bc7 100644 --- a/pkg/compute/models/guests.go +++ b/pkg/compute/models/guests.go @@ -210,7 +210,7 @@ func (manager *SGuestManager) ListItemFilter( q = q.In("id", diskQ.SubQuery()) } - scalingGroupQ := ScalingGroupGuestManager.Query("guest_id").NotEquals("guest_status", api.SG_GUEST_STATUS_PENDING_REMOVE).Snapshot() + scalingGroupQ := ScalingGroupGuestManager.Query("guest_id").Snapshot() scalingGroupQ, err = manager.SScalingGroupResourceBaseManager.ListItemFilter(ctx, scalingGroupQ, userCred, query.ScalingGroupFilterListInput) if err != nil { return nil, errors.Wrap(err, "SScaligGroupResourceBaseManager.ListItemFilter") diff --git a/pkg/compute/models/scaling_group.go b/pkg/compute/models/scaling_group.go index b26af3d145..8f5958d7a0 100644 --- a/pkg/compute/models/scaling_group.go +++ b/pkg/compute/models/scaling_group.go @@ -379,7 +379,7 @@ func (sgm *SScalingGroupManager) FetchCustomizeColumns( func (sg *SScalingGroup) GuestNumber() (int, error) { q := GuestManager.Query().In("id", ScalingGroupGuestManager.Query("guest_id").Equals("scaling_group_id", - sg.Id).NotEquals("guest_status", api.SG_GUEST_STATUS_PENDING_REMOVE).SubQuery()).IsFalse("pending_deleted") + sg.Id).SubQuery()).IsFalse("pending_deleted") return q.CountWithError() } @@ -440,8 +440,6 @@ type sExecResult struct { func (sg *SScalingGroup) exec(ctx context.Context, action IScalingAction) (ret sExecResult) { ret.code = 3 ret.intanceNum = -1 - lockman.LockObject(ctx, sg) - defer lockman.ReleaseObject(ctx, sg) // query again to fetch the latest desire instance number of sg model, err := ScalingGroupManager.FetchById(sg.Id) if err != nil { @@ -497,20 +495,27 @@ func (sg *SScalingGroup) exec(ctx context.Context, action IScalingAction) (ret s // Scale will modify SScalingGroup.DesireInstanceNumber and generate SScalingActivity based on the trigger and its // corresponding SScalingPolicy. -func (sg *SScalingGroup) Scale(ctx context.Context, triggerDesc IScalingTriggerDesc, action IScalingAction) (bool, error) { +func (sg *SScalingGroup) Scale(ctx context.Context, triggerDesc IScalingTriggerDesc, action IScalingAction, + coolingTime int) error { + lockman.LockObject(ctx, sg) + defer lockman.ReleaseObject(ctx, sg) isExec := false + defer func() { + if isExec && coolingTime > 0 { + sg.SetAllowScaleTime(time.Now().Add(time.Duration(coolingTime) * time.Second)) + } + }() if sg.Enabled.IsFalse() { - return isExec, nil + return nil } scalingActivity, err := ScalingActivityManager.CreateScalingActivity(sg.Id, triggerDesc.TriggerDescription(), api.SA_STATUS_EXEC) if err != nil { - return isExec, errors.Wrapf(err, "create ScalingActivity whose ScalingGroup is %s error", sg.Id) + return errors.Wrapf(err, "create ScalingActivity whose ScalingGroup is %s error", sg.Id) } if action.CheckCoolTime() && !sg.AllowScale() { err = scalingActivity.SetReject("", - fmt.Sprintf("The Cooling Time limit the execution time of the policy to at least: %s", - sg.AllowScaleTime)) - return isExec, nil + fmt.Sprintf("The Cooling Time limit the execution time of the policy to at least: %s", sg.AllowScaleTime.Format("2006-01-02 15:04:05"))) + return nil } ret := sg.exec(ctx, action) @@ -530,7 +535,7 @@ func (sg *SScalingGroup) Scale(ctx context.Context, triggerDesc IScalingTriggerD if err != nil { log.Errorf("ScalingActivity set result failed: %s", err.Error()) } - return isExec, nil + return nil } func (sgm *SScalingGroupManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) { diff --git a/pkg/compute/models/scaling_policy.go b/pkg/compute/models/scaling_policy.go index b26451da0c..f91de52650 100644 --- a/pkg/compute/models/scaling_policy.go +++ b/pkg/compute/models/scaling_policy.go @@ -400,13 +400,10 @@ func (sp *SScalingPolicy) PerformTrigger(ctx context.Context, userCred mcclient. } triggerDesc = trigger } - isExec, err := sg.Scale(ctx, triggerDesc, sp) + err = sg.Scale(ctx, triggerDesc, sp, sp.CoolingTime) if err != nil { return nil, errors.Wrap(err, "ScalingPolicy.Scale") } - if isExec && sp.CoolingTime > 0 { - sg.SetAllowScaleTime(time.Now().Add(time.Duration(sp.CoolingTime) * time.Second)) - } return nil, err } diff --git a/pkg/compute/models/scalinggroup_guest.go b/pkg/compute/models/scalinggroup_guest.go index 209a4cd65d..0191cb02ac 100644 --- a/pkg/compute/models/scalinggroup_guest.go +++ b/pkg/compute/models/scalinggroup_guest.go @@ -19,6 +19,7 @@ import ( "time" "yunion.io/x/pkg/tristate" + "yunion.io/x/sqlchemy" "yunion.io/x/onecloud/pkg/apis/compute" "yunion.io/x/onecloud/pkg/cloudcommon/db" @@ -77,7 +78,7 @@ func (sgg *SScalingGroupGuest) Detach(ctx context.Context, userCred mcclient.Tok func (sggm *SScalingGroupGuestManager) Fetch(scalingGroupId, guestId string) ([]SScalingGroupGuest, error) { sggs := make([]SScalingGroupGuest, 0) - q := sggm.Query().NotEquals("guest_status", compute.SG_GUEST_STATUS_PENDING_REMOVE) + q := sggm.Query() if len(scalingGroupId) != 0 { q = q.Equals("scaling_group_id", scalingGroupId) } @@ -100,3 +101,8 @@ func (sgg *SScalingGroupGuest) SetGuestStatus(status string) error { }) return err } + +func (sggm *SScalingGroupGuestManager) Query(fields ...string) *sqlchemy.SQuery { + return sggm.SVirtualJointResourceBaseManager.Query(fields...).NotEquals("guest_status", + compute.SG_GUEST_STATUS_PENDING_REMOVE) +} diff --git a/pkg/compute/tasks/guest_detach_scalinggroup.go b/pkg/compute/tasks/guest_detach_scalinggroup.go index 41d5dd04d7..da6a6d72a8 100644 --- a/pkg/compute/tasks/guest_detach_scalinggroup.go +++ b/pkg/compute/tasks/guest_detach_scalinggroup.go @@ -120,7 +120,7 @@ func (self *GuestDetachScalingGroupTask) OnDeleteGuestComplete(ctx context.Conte logclient.AddActionLogWithStartable(self, sg, logclient.ACT_REMOVE_GUEST, fmt.Sprintf("Instance '%s' was removed", guestId), self.UserCred, true) if auto, _ := self.Params.Bool("auto"); !auto { // scale; change the desire number - _, err := sg.Scale(ctx, SScalingTriggerDesc{guestName}, SScalingActionDesc{}) + err := sg.Scale(ctx, SScalingTriggerDesc{guestName}, SScalingActionDesc{}, 0) if err != nil { log.Errorf("ScalingGroup '%s' scale after removing instance '%s' failed: %s", sg.GetId(), guestId, err.Error()) } diff --git a/pkg/controller/autoscaling/controller.go b/pkg/controller/autoscaling/controller.go index 83e6cb4942..96d896c0d6 100644 --- a/pkg/controller/autoscaling/controller.go +++ b/pkg/controller/autoscaling/controller.go @@ -514,7 +514,15 @@ func (asc *SASController) CreateInstances( instanceMap[instane.ID] = instane } // check all server's status - go asc.checkAllServer(session, guestIds, retChan) + var waitLimit, waitinterval time.Duration + if sg.Hypervisor == compute.HYPERVISOR_KVM { + waitLimit = 5 * time.Minute + waitinterval = 3 * time.Second + } else { + waitLimit = 10 * time.Minute + waitinterval = 10 * time.Second + } + go asc.checkAllServer(session, guestIds, retChan, waitLimit, waitinterval) // fourth stage: bind lb and db failRecord := &SFailRecord{ @@ -558,10 +566,11 @@ type SCreateRet struct { Status string } -func (asc *SASController) checkAllServer(session *mcclient.ClientSession, guestIds []string, retChan chan SCreateRet) { +func (asc *SASController) checkAllServer(session *mcclient.ClientSession, guestIds []string, retChan chan SCreateRet, + waitLimit, waitInterval time.Duration) { guestIDSet := sets.NewString(guestIds...) - ticker := time.NewTicker(3 * time.Second) - timer := time.NewTimer(5 * time.Minute) + timer := time.NewTimer(waitLimit) + ticker := time.NewTicker(waitInterval) defer func() { close(retChan) ticker.Stop()