Files
cloudpods/pkg/cloudcommon/db/taskman/tasks.go
2026-04-23 10:18:42 +08:00

1612 lines
45 KiB
Go

// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package taskman
import (
"context"
"database/sql"
"fmt"
"net/http"
"reflect"
"runtime/debug"
"sort"
"strconv"
"strings"
"time"
"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/appctx"
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/gotypes"
"yunion.io/x/pkg/util/httputils"
"yunion.io/x/pkg/util/rbacscope"
"yunion.io/x/pkg/util/reflectutils"
"yunion.io/x/pkg/util/stringutils"
"yunion.io/x/pkg/util/timeutils"
"yunion.io/x/pkg/util/version"
"yunion.io/x/pkg/utils"
"yunion.io/x/sqlchemy"
"yunion.io/x/onecloud/pkg/apis"
"yunion.io/x/onecloud/pkg/appsrv"
"yunion.io/x/onecloud/pkg/cloudcommon/consts"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
"yunion.io/x/onecloud/pkg/cloudcommon/db/quotas"
"yunion.io/x/onecloud/pkg/httperrors"
"yunion.io/x/onecloud/pkg/mcclient"
"yunion.io/x/onecloud/pkg/mcclient/auth"
"yunion.io/x/onecloud/pkg/mcclient/modules/yunionconf"
"yunion.io/x/onecloud/pkg/util/ctx"
"yunion.io/x/onecloud/pkg/util/logclient"
"yunion.io/x/onecloud/pkg/util/stringutils2"
)
const (
PARENT_TASK_ID_KEY = "parent_task_id"
PENDING_USAGE_KEY = "__pending_usage__"
PARENT_TASK_NOTIFY_KEY = "__parent_task_notifyurl"
REQUEST_CONTEXT_KEY = "__request_context"
TASK_STAGE_FAILED = "failed"
TASK_STAGE_COMPLETE = "complete"
MAX_REMOTE_NOTIFY_TRIES = 5
MULTI_OBJECTS_ID = "[--MULTI_OBJECTS--]"
TASK_INIT_STAGE = "on_init"
CONVERT_TASK = "convert_task"
LANG = "lang"
taskStatusDone = "done"
TASK_STATUS_QUEUE = "queue"
)
type STaskManager struct {
db.SModelBaseManager
db.SProjectizedResourceBaseManager
db.SStatusResourceBaseManager
}
var TaskManager *STaskManager
var userCredWidthLimit = 0
func init() {
TaskManager = &STaskManager{
SModelBaseManager: db.NewModelBaseManager(STask{}, "tasks_tbl", "task", "tasks"),
}
TaskManager.SetVirtualObject(TaskManager)
if field, ok := reflect.TypeOf(&STask{}).Elem().FieldByName("UserCred"); ok {
if widthStr := field.Tag.Get(sqlchemy.TAG_WIDTH); len(widthStr) > 0 {
userCredWidthLimit, _ = strconv.Atoi(widthStr)
}
}
TaskManager.TableSpec().AddIndex(true, "id", "created_at", "parent_task_id", "stage")
}
type STask struct {
db.SModelBase
// 资源创建时间
CreatedAt time.Time `nullable:"false" created_at:"true" index:"true" get:"user" list:"user" json:"created_at"`
// 资源更新时间
UpdatedAt time.Time `nullable:"false" updated_at:"true" list:"user" json:"updated_at"`
// 资源被更新次数
UpdateVersion int `default:"0" nullable:"false" auto_version:"true" list:"user" json:"update_version"`
Id string `width:"36" charset:"ascii" primary:"true" list:"user"` // Column(VARCHAR(36, charset='ascii'), primary_key=True, default=get_uuid)
STaskBase
db.SProjectizedResourceBase
taskObject db.IStandaloneModel `ignore:"true"`
taskObjects []db.IStandaloneModel `ignore:"true"`
SubTaskCount int `ignore:"true" json:"sub_task_count"`
FailSubTaskCnt int `ignore:"true" json:"fail_sub_task_cnt"`
SuccSubTaskCnt int `ignore:"true" json:"succ_sub_task_cnt"`
}
func (manager *STaskManager) CreateByInsertOrUpdate() bool {
return false
}
func (manager *STaskManager) AllowListItems(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject) bool {
return true
}
func (manager *STaskManager) AllowCreateItem(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) bool {
return false
}
func (manager *STaskManager) FilterById(q *sqlchemy.SQuery, idStr string) *sqlchemy.SQuery {
return q.Equals("id", idStr)
}
func (manager *STaskManager) FilterByNotId(q *sqlchemy.SQuery, idStr string) *sqlchemy.SQuery {
return q.NotEquals("id", idStr)
}
func (manager *STaskManager) FilterByName(q *sqlchemy.SQuery, name string) *sqlchemy.SQuery {
return q.Equals("id", name)
}
func (manager *STaskManager) PerformAction(ctx context.Context, userCred mcclient.TokenCredential, taskId string, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
err := runTask(taskId, data)
if err != nil {
return nil, errors.Wrapf(err, "runTask")
}
resp := jsonutils.NewDict()
resp.Add(jsonutils.NewString("ok"), "result")
return resp, nil
}
func (manager *STask) PreCheckPerformAction(
ctx context.Context, userCred mcclient.TokenCredential,
action string, query jsonutils.JSONObject, data jsonutils.JSONObject,
) error {
return nil
}
func (task *STask) GetOwnerId() mcclient.IIdentityProvider {
return task.SProjectizedResourceBase.GetOwnerId()
}
func (manager *STaskManager) FilterByOwner(ctx context.Context, q *sqlchemy.SQuery, man db.FilterByOwnerProvider, userCred mcclient.TokenCredential, owner mcclient.IIdentityProvider, scope rbacscope.TRbacScope) *sqlchemy.SQuery {
taskQ := TaskObjectManager.Query("task_id")
taskQ = taskQ.Snapshot()
taskQ = manager.SProjectizedResourceBaseManager.FilterByOwner(ctx, taskQ, man, userCred, owner, scope)
if taskQ.IsAltered() {
taskSubQ := taskQ.SubQuery()
q = q.Join(taskSubQ, sqlchemy.Equals(q.Field("id"), taskSubQ.Field("task_id")))
}
return q
}
func (manager *STaskManager) FetchOwnerId(ctx context.Context, data jsonutils.JSONObject) (mcclient.IIdentityProvider, error) {
return manager.SProjectizedResourceBaseManager.FetchOwnerId(ctx, data)
}
func (manager *STaskManager) FetchTaskById(taskId string) *STask {
return manager.fetchTask(taskId)
}
func (task *STask) ValidateDeleteCondition(ctx context.Context, info jsonutils.JSONObject) error {
return httperrors.NewForbiddenError("forbidden")
}
func (task *STask) ValidateUpdateCondition(ctx context.Context) error {
return httperrors.NewForbiddenError("forbidden")
}
func (task *STask) BeforeInsert() {
if len(task.Id) == 0 {
task.Id = stringutils.UUID4()
}
}
func (task *STask) GetId() string {
return task.Id
}
func (task *STask) GetName() string {
return task.TaskName
}
func (task *STask) saveStartAt() {
if !task.StartAt.IsZero() {
return
}
_, err := db.Update(task, func() error {
task.StartAt = timeutils.UtcNow()
return nil
})
if err != nil {
log.Errorf("task %s save start_at fail: %s", task.String(), err)
}
}
func fetchTaskParams(
ctx context.Context,
taskName string,
taskData *jsonutils.JSONDict,
parentTaskId string,
parentTaskNotifyUrl string,
pendingUsages []quotas.IQuota,
) *jsonutils.JSONDict {
var data *jsonutils.JSONDict
if taskData != nil {
excludeKeys := []string{
PARENT_TASK_ID_KEY, PARENT_TASK_NOTIFY_KEY, PENDING_USAGE_KEY,
}
for i := 1; taskData.Contains(pendingUsageKey(i)); i += 1 {
excludeKeys = append(excludeKeys, pendingUsageKey(i))
}
data = taskData.CopyExcludes(excludeKeys...)
} else {
data = jsonutils.NewDict()
}
reqContext := appctx.FetchAppContextData(ctx)
if !reqContext.IsZero() {
data.Add(jsonutils.Marshal(&reqContext), REQUEST_CONTEXT_KEY)
}
if len(parentTaskId) > 0 || len(parentTaskNotifyUrl) > 0 {
if len(parentTaskId) > 0 {
data.Add(jsonutils.NewString(parentTaskId), PARENT_TASK_ID_KEY)
}
if len(parentTaskNotifyUrl) > 0 {
data.Add(jsonutils.NewString(parentTaskNotifyUrl), PARENT_TASK_NOTIFY_KEY)
log.Infof("%s notify parent url: %s", taskName, parentTaskNotifyUrl)
}
} else {
if !reqContext.IsZero() {
if len(reqContext.TaskId) > 0 && len(reqContext.TaskNotifyUrl) == 0 {
data.Add(jsonutils.NewString(reqContext.TaskId), PARENT_TASK_ID_KEY)
}
if len(reqContext.TaskNotifyUrl) > 0 {
data.Add(jsonutils.NewString(reqContext.TaskNotifyUrl), PARENT_TASK_NOTIFY_KEY)
log.Infof("%s notify parent url: %s", taskName, reqContext.TaskNotifyUrl)
}
}
}
if len(pendingUsages) > 0 {
for i := range pendingUsages {
pendingUsage := pendingUsages[i]
if gotypes.IsNil(pendingUsage) {
continue
}
key := pendingUsageKey(i)
data.Add(jsonutils.Marshal(pendingUsage), key)
}
}
return data
}
func (manager *STaskManager) NewTask(
ctx context.Context,
taskName string,
obj db.IStandaloneModel,
userCred mcclient.TokenCredential,
taskData *jsonutils.JSONDict,
parentTaskId string,
parentTaskNotifyUrl string,
pendingUsage ...quotas.IQuota,
) (*STask, error) {
if userCredWidthLimit > 0 && len(userCred.String()) > userCredWidthLimit {
return nil, fmt.Errorf("Too many permissions for user %s", userCred.GetUserName())
}
if !isTaskExist(taskName) {
return nil, fmt.Errorf("task %s not found", taskName)
}
data := fetchTaskParams(ctx, taskName, taskData, parentTaskId, parentTaskNotifyUrl, pendingUsage)
task := &STask{
STaskBase: STaskBase{
ObjType: obj.Keyword(),
ObjId: obj.GetId(),
Object: obj.GetName(),
TaskName: taskName,
UserCred: userCred,
Params: data,
Stage: TASK_INIT_STAGE,
ParentTaskId: parentTaskId,
},
}
task.SetModelManager(manager, task)
err := manager.TableSpec().Insert(ctx, task)
if err != nil {
log.Errorf("Task insert error %s", err)
return nil, err
}
task.SetProgressAndStatus(0, TASK_STATUS_QUEUE)
{
to, err := TaskObjectManager.insertObject(ctx, task.Id, obj)
if err != nil {
log.Errorf("Taskobject insert error %s", err)
return nil, errors.Wrap(err, "TaskObjectManager.insertObject")
}
db.Update(task, func() error {
task.ProjectId = to.ProjectId
task.DomainId = to.DomainId
return nil
})
}
parentTask := task.GetParentTask()
if parentTask != nil {
st := &SSubTask{TaskId: parentTask.Id, Stage: parentTask.Stage, SubtaskId: task.Id}
st.SetModelManager(SubTaskManager, st)
err := SubTaskManager.TableSpec().Insert(ctx, st)
if err != nil {
log.Errorf("Subtask insert error %s", err)
return nil, err
}
}
return task, nil
}
func (manager *STaskManager) NewParallelTask(
ctx context.Context,
taskName string,
objs []db.IStandaloneModel,
userCred mcclient.TokenCredential,
taskData *jsonutils.JSONDict,
parentTaskId string,
parentTaskNotifyUrl string,
pendingUsage ...quotas.IQuota,
) (*STask, error) {
if !isTaskExist(taskName) {
return nil, fmt.Errorf("task %s not found", taskName)
}
if len(objs) == 0 {
return nil, fmt.Errorf("failed to do task %s with zero objs", taskName)
}
log.Debugf("number of objs: %d", len(objs))
data := fetchTaskParams(ctx, taskName, taskData, parentTaskId, parentTaskNotifyUrl, pendingUsage)
task := &STask{
STaskBase: STaskBase{
ObjType: objs[0].Keyword(),
Object: MULTI_OBJECTS_ID,
ObjId: MULTI_OBJECTS_ID,
TaskName: taskName,
UserCred: userCred,
Params: data,
Stage: TASK_INIT_STAGE,
ParentTaskId: parentTaskId,
},
}
task.SetModelManager(manager, task)
err := manager.TableSpec().Insert(ctx, task)
if err != nil {
log.Errorf("Task insert error %s", err)
return nil, err
}
task.SetProgressAndStatus(0, TASK_STATUS_QUEUE)
domainIds := stringutils2.NewSortedStrings(nil)
tenantIds := stringutils2.NewSortedStrings(nil)
for i := range objs {
to, err := TaskObjectManager.insertObject(ctx, task.Id, objs[i])
if err != nil {
log.Errorf("Taskobject insert error %s", err)
return nil, errors.Wrap(err, "insert task object")
}
tenantIds = tenantIds.Append(to.ProjectId)
domainIds = domainIds.Append(to.DomainId)
}
db.Update(task, func() error {
task.DomainId = strings.Join(domainIds, ",")
task.ProjectId = strings.Join(tenantIds, ",")
return nil
})
parentTask := task.GetParentTask()
if parentTask != nil {
st := &SSubTask{TaskId: parentTask.Id, Stage: parentTask.Stage, SubtaskId: task.Id}
st.SetModelManager(SubTaskManager, st)
err := SubTaskManager.TableSpec().Insert(ctx, st)
if err != nil {
log.Errorf("Subtask insert error %s", err)
return nil, err
}
}
return task, nil
}
func (manager *STaskManager) fetchTask(idStr string) *STask {
iTask, err := db.NewModelObject(manager)
if err != nil {
log.Errorf("New task object fail: %s", err)
return nil
}
err = manager.Query().Equals("id", idStr).First(iTask)
if err != nil {
log.Errorf("GetTask %s fail: %s", idStr, err)
return nil
}
task := iTask.(*STask)
task.fixParams()
return task
}
func (task *STask) fixParams() {
if task.Params == nil {
task.Params = jsonutils.NewDict()
}
}
func (manager *STaskManager) execTask(taskId string, data jsonutils.JSONObject) {
baseTask := manager.fetchTask(taskId)
if baseTask == nil {
return
}
manager.execTaskObject(baseTask, data)
}
func (manager *STaskManager) execTaskObject(baseTask *STask, data jsonutils.JSONObject) {
taskType, ok := taskTable[baseTask.TaskName]
if !ok {
log.Errorf("Cannot find task %s", baseTask.TaskName)
return
}
log.Debugf("Do task %s(%s) with data %s at stage %s", taskType, baseTask.Id, data, baseTask.Stage)
taskValue := reflect.New(taskType)
if taskValue.Type().Implements(ITaskType) {
execITask(taskValue, baseTask, data, false)
} else if taskValue.Type().Implements(IBatchTaskType) {
execITask(taskValue, baseTask, data, true)
} else {
log.Errorf("Unsupported task type?? %s", taskValue.Type())
}
}
type sSortedObjects []db.IStandaloneModel
func (a sSortedObjects) Len() int { return len(a) }
func (a sSortedObjects) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a sSortedObjects) Less(i, j int) bool { return a[i].GetId() < a[j].GetId() }
func execITask(taskValue reflect.Value, task *STask, odata jsonutils.JSONObject, isMulti bool) {
ctxData := task.GetRequestContext()
ctx := ctxData.GetContext()
task.saveStartAt()
taskFailed := false
var data jsonutils.JSONObject
if odata != nil {
switch dictdata := odata.(type) {
case *jsonutils.JSONDict:
taskStatus, _ := odata.GetString("__status__")
if len(taskStatus) > 0 && taskStatus != "OK" {
taskFailed = true
dictdata.Set("__stage__", jsonutils.NewString(task.Stage))
if !dictdata.Contains("__reason__") {
reasonJson := dictdata.CopyExcludes("__status__", "__stage__")
dictdata.Set("__reason__", reasonJson)
}
/*if vdata, ok := data.(*jsonutils.JSONDict); ok {
reason, err := vdata.Get("__reason__") // only dict support Get
if err != nil {
reason = jsonutils.NewString(fmt.Sprintf("Task failed due to unknown remote errors! %s", odata))
vdata.Set("__reason__", reason)
}
}*/
}
data = dictdata
default:
data = odata
}
} else {
data = jsonutils.NewDict()
}
stageName := task.Stage
if taskFailed {
stageName = fmt.Sprintf("%sFailed", task.Stage)
if strings.Contains(stageName, "_") {
stageName = fmt.Sprintf("%s_failed", task.Stage)
}
}
if strings.Contains(stageName, "_") {
stageName = utils.Kebab2Camel(stageName, "_")
}
funcValue := taskValue.MethodByName(stageName)
if !funcValue.IsValid() || funcValue.IsNil() {
msg := fmt.Sprintf("Stage %s not found", stageName)
if taskFailed {
// failed handler is optional, ignore the error
log.Warningf("%s", msg)
msg, _ = data.GetString()
} else {
log.Errorf("%s", msg)
}
task.SetStageFailed(ctx, jsonutils.NewString(msg))
task.SaveRequestContext(&ctxData)
return
}
objManager := db.GetModelManager(task.ObjType)
if objManager == nil {
msg := fmt.Sprintf("model %s %s(%s) not found??? ...", task.ObjType, task.Object, task.ObjId)
log.Errorf("%s", msg)
task.SetStageFailed(ctx, jsonutils.NewString(msg))
task.SaveRequestContext(&ctxData)
return
}
// log.Debugf("objManager: %s", objManager)
objResManager, ok := objManager.(db.IStandaloneModelManager)
if !ok {
msg := fmt.Sprintf("model %s %s(%s) is not a resource??? ...", task.ObjType, task.Object, task.ObjId)
log.Errorf("%s", msg)
task.SetStageFailed(ctx, jsonutils.NewString(msg))
task.SaveRequestContext(&ctxData)
return
}
params := make([]reflect.Value, 3)
params[0] = reflect.ValueOf(ctx)
if isMulti {
objIds := TaskObjectManager.GetObjectIds(task)
objs := make([]db.IStandaloneModel, len(objIds))
for i, objId := range objIds {
obj, err := objResManager.FetchById(objId)
if err != nil {
if errors.Cause(err) == sql.ErrNoRows {
obj, err = objResManager.RawFetchById(objId)
if err == nil {
// find it
} else if errors.Cause(err) == sql.ErrNoRows {
// not found resource
err = errors.ErrNotFound
}
}
if obj == nil {
msg := errors.Wrapf(err, "fail to find %s object %s", task.ObjType, objId).Error()
log.Errorln(msg)
task.SetStageFailed(ctx, jsonutils.NewString(msg))
task.SaveRequestContext(&ctxData)
return
}
}
objs[i] = obj.(db.IStandaloneModel)
}
task.taskObjects = objs
// sort objects by ids to avoid deadlock
sort.Sort(sSortedObjects(objs))
for i := range objs {
lockman.LockObject(ctx, objs[i])
defer lockman.ReleaseObject(ctx, objs[i])
}
params[1] = reflect.ValueOf(objs)
} else {
obj, err := objResManager.FetchById(task.ObjId)
if err != nil {
if errors.Cause(err) == sql.ErrNoRows {
obj, err = objResManager.RawFetchById(task.ObjId)
if err == nil {
// find it
} else if errors.Cause(err) == sql.ErrNoRows {
// not found resource
err = errors.ErrNotFound
}
}
if obj == nil {
msg := errors.Wrapf(err, "fail to find %s object %s", task.ObjType, task.ObjId).Error()
log.Errorln(msg)
task.SetStageFailed(ctx, jsonutils.NewString(msg))
task.SaveRequestContext(&ctxData)
return
}
}
task.taskObject = obj.(db.IStandaloneModel)
lockman.LockObject(ctx, obj)
defer lockman.ReleaseObject(ctx, obj)
params[1] = reflect.ValueOf(obj)
}
params[2] = reflect.ValueOf(data)
filled := reflectutils.FillEmbededStructValue(taskValue.Elem(), reflect.Indirect(reflect.ValueOf(task)))
if !filled {
log.Errorf("Cannot locate baseTask embedded struct, give up...")
return
}
defer func() {
if r := recover(); r != nil {
// call set stage failed, should not call task.SetStageFailed
// func SetStageFailed may be overloading
yunionconf.BugReport.SendBugReport(ctx, version.GetShortString(), string(debug.Stack()), errors.Errorf("%s", r))
log.Errorf("Task %s PANIC on stage %s: %v \n%s", task.TaskName, stageName, r, debug.Stack())
SetStageFailedFuncValue := taskValue.MethodByName("SetStageFailed")
SetStageFailedFuncValue.Call(
[]reflect.Value{
reflect.ValueOf(ctx),
reflect.ValueOf(jsonutils.NewString(fmt.Sprintf("%v", r))),
},
)
obj, err := objResManager.FetchById(task.ObjId)
if err != nil {
return
}
statusObj, ok := obj.(db.IStatusStandaloneModel)
if ok {
db.StatusBaseSetStatus(ctx, statusObj, task.GetUserCred(), apis.STATUS_UNKNOWN, fmt.Sprintf("%v", r))
}
notes := map[string]interface{}{
"Stack": string(debug.Stack()),
"Version": version.GetShortString(),
"Task": task.TaskName,
"Stage": stageName,
"Message": fmt.Sprintf("%v", r),
}
logclient.AddSimpleActionLog(obj, logclient.ACT_PANIC, notes, task.GetUserCred(), false)
}
}()
log.Debugf("Call %s(%s) %s %#v", task.TaskName, task.Id, stageName, params)
funcValue.Call(params)
// call save request context
saveRequestContextFuncValue := taskValue.MethodByName("SaveRequestContext")
saveRequestContextFuncValue.Call([]reflect.Value{reflect.ValueOf(&ctxData)})
}
func (task *STask) ScheduleRun(data jsonutils.JSONObject) error {
return runTask(task.Id, data)
}
func (task *STask) IsSubtask() bool {
return task.HasParentTask()
}
func (task *STask) GetParentTaskId() string {
if len(task.ParentTaskId) > 0 {
return task.ParentTaskId
}
parentTaskId, _ := task.Params.GetString(PARENT_TASK_ID_KEY)
if len(parentTaskId) > 0 {
return parentTaskId
}
return ""
}
func (task *STask) HasParentTask() bool {
parentTaskId := task.GetParentTaskId()
if len(parentTaskId) > 0 {
return true
}
return false
}
func (task *STask) GetParentTask() *STask {
parentTaskId := task.GetParentTaskId()
if len(parentTaskId) > 0 {
return TaskManager.fetchTask(parentTaskId)
}
return nil
}
func (task *STask) GetRequestContext() appctx.AppContextData {
ctxData := appctx.AppContextData{}
if task.Params != nil {
ctxJson, _ := task.Params.Get(REQUEST_CONTEXT_KEY)
if ctxJson != nil {
ctxJson.Unmarshal(&ctxData)
}
}
// clear parentTaskId
ctxData.TaskId = ""
ctxData.TaskNotifyUrl = ""
return ctxData
}
func (task *STask) SaveRequestContext(data *appctx.AppContextData) {
jsonData := jsonutils.Marshal(data)
log.Debugf("SaveRequestContext %s(%s) %s param %s", task.TaskName, task.Id, jsonData, task.Params)
_, err := db.Update(task, func() error {
params := task.Params.CopyExcludes(REQUEST_CONTEXT_KEY)
params.Add(jsonData, REQUEST_CONTEXT_KEY)
task.Params = params
task.EndAt = timeutils.UtcNow()
return nil
})
log.Debugf("Params: %s(%s) %s", task.TaskName, task.Id, task.Params)
if err != nil {
log.Errorf("save_request_context fail %s", err)
}
}
func (task *STask) SaveParams(data *jsonutils.JSONDict) error {
return task.SetStage("", data)
}
func (task *STask) SetStage(stageName string, data *jsonutils.JSONDict) error {
_, err := db.Update(task, func() error {
params := jsonutils.NewDict()
params.Update(task.Params)
if data != nil {
params.Update(data)
}
if len(stageName) > 0 {
stages, _ := params.Get("__stages")
if stages == nil {
stages = jsonutils.NewArray()
params.Add(stages, "__stages")
}
stageList := stages.(*jsonutils.JSONArray)
stageData := jsonutils.NewDict()
stageData.Add(jsonutils.NewString(task.Stage), "name")
stageData.Add(jsonutils.NewTimeString(time.Now()), "complete_at")
stageList.Add(stageData)
task.Stage = stageName
}
task.Params = params
return nil
})
if err != nil {
log.Errorf("Task %s(%s) set_stage %s fail %s", task.TaskName, task.Id, stageName, err)
}
return err
}
func (task *STask) GetObjectIdStr() string {
if task.ObjId == MULTI_OBJECTS_ID {
return strings.Join(TaskObjectManager.GetObjectIds(task), ",")
} else {
return task.ObjId
}
}
func (task *STask) GetObjectStr() string {
if task.ObjId == MULTI_OBJECTS_ID {
return strings.Join(TaskObjectManager.GetObjectNames(task), ",")
} else {
return task.Object
}
}
func (task *STask) SetStageComplete(ctx context.Context, data *jsonutils.JSONDict) {
log.Infof("XXX TASK %s(%s) complete", task.TaskName, task.Id)
task.SetStage(TASK_STAGE_COMPLETE, data)
task.SetProgressAndStatus(100, taskStatusDone)
if data == nil {
data = jsonutils.NewDict()
}
if data.Size() == 0 {
data.Add(jsonutils.NewString(task.GetObjectIdStr()), "id")
data.Add(jsonutils.NewString(task.GetObjectStr()), "name")
data.Add(jsonutils.NewString(task.ObjType), "type")
}
task.NotifyParentTaskComplete(ctx, data, false)
}
func (task *STask) SetStageFailed(ctx context.Context, reason jsonutils.JSONObject) {
if task.Stage == TASK_STAGE_FAILED {
log.Warningf("Task %s(%s) has been failed", task.TaskName, task.Id)
return
}
log.Infof("XXX TASK %s(%s) failed: %s on stage %s", task.TaskName, task.Id, reason, task.Stage)
reasonDict := jsonutils.NewDict()
reasonDict.Add(jsonutils.NewString(task.Stage), "stage")
if reason != nil {
reasonDict.Add(reason, "reason")
}
reason = reasonDict
prevFailed, _ := task.Params.Get("__failed_reason")
if prevFailed != nil {
switch prevFailed2 := prevFailed.(type) {
case *jsonutils.JSONArray:
prevFailed2.Add(reason)
reason = prevFailed
default:
reason = jsonutils.NewArray(prevFailed, reason)
}
}
data := jsonutils.NewDict()
data.Add(reason, "__failed_reason")
task.SetStage(TASK_STAGE_FAILED, data)
task.SetProgressAndStatus(100, taskStatusDone)
task.NotifyParentTaskFailure(ctx, reason)
}
func (task *STask) NotifyParentTaskComplete(ctx context.Context, body *jsonutils.JSONDict, failed bool) {
log.Infof("notify_parent_task_complete: %s(%s) params %s", task.TaskName, task.Id, task.Params)
parentTaskId := task.GetParentTaskId()
parentTaskNotify, _ := task.Params.GetString(PARENT_TASK_NOTIFY_KEY)
if len(parentTaskId) > 0 {
subTask := SubTaskManager.GetSubTask(parentTaskId, task.Id)
if subTask != nil {
subTask.SaveResults(failed, body)
}
func() {
lockman.LockRawObject(ctx, "tasks", parentTaskId)
defer lockman.ReleaseRawObject(ctx, "tasks", parentTaskId)
pTask := TaskManager.fetchTask(parentTaskId)
if pTask == nil {
saveTaskUpCallStatus(task.GetId(), parentTaskId, false, errors.Wrap(errors.ErrNotFound, "Parent task not found").Error())
log.Errorf("Parent task %s not found", parentTaskId)
return
}
if pTask.IsCurrentStageComplete() {
err := pTask.ScheduleRun(body)
if err != nil {
saveTaskUpCallStatus(task.GetId(), parentTaskId, false, err.Error())
} else {
saveTaskUpCallStatus(task.GetId(), parentTaskId, true, "")
}
}
}()
}
if len(parentTaskNotify) > 0 {
header := task.getTaskHeader()
go notifyRemoteTask(ctx, parentTaskNotify, task.GetId(), header, body, 0)
}
}
func notifyRemoteTask(ctx context.Context, notifyUrl string, taskId string, header http.Header, body jsonutils.JSONObject, tried uint) {
client := httputils.GetDefaultClient()
_, body, err := httputils.JSONRequest(client, ctx, "POST", notifyUrl, header, body, false)
if err != nil {
log.Errorf("notifyRemoteTask fail %s", err)
if tried > MAX_REMOTE_NOTIFY_TRIES {
log.Errorf("notifyRemoteTask max tried reached, give up...")
saveTaskUpCallStatus(taskId, notifyUrl, false, fmt.Sprintf("notifyRemoteTask max tried reached, give up... %s", err.Error()))
} else {
time.Sleep(time.Second * time.Duration(1<<tried))
notifyRemoteTask(ctx, notifyUrl, taskId, header, body, tried+1)
}
return
} else {
log.Infof("Notify remote URL %s get acked: %s, taskId: %s", notifyUrl, body.String(), taskId)
saveTaskUpCallStatus(taskId, notifyUrl, true, body.String())
}
}
func (task *STask) NotifyParentTaskFailure(ctx context.Context, reason jsonutils.JSONObject) {
body := jsonutils.NewDict()
body.Add(jsonutils.NewString("error"), "__status__")
body.Add(jsonutils.NewString(task.TaskName), "__task_name__")
body.Add(reason, "__reason__")
task.NotifyParentTaskComplete(ctx, body, true)
}
func (task *STask) IsCurrentStageComplete() bool {
totalSubtasksCnt, _ := SubTaskManager.GetTotalSubtasksCount(task.Id, task.Stage)
initSubtasksCnt, _ := SubTaskManager.GetInitSubtasksCount(task.Id, task.Stage)
log.Debugf("Task %s IsCurrentStageComplete totalSubtasks %d initSubtasks %d ", task.String(), totalSubtasksCnt, initSubtasksCnt)
task.SetProgress(float32(totalSubtasksCnt-initSubtasksCnt) * 100 / float32(totalSubtasksCnt))
if totalSubtasksCnt > 0 && initSubtasksCnt == 0 {
return true
} else {
return false
}
}
func (task *STask) GetPendingUsage(quota quotas.IQuota, index int) error {
key := pendingUsageKey(index)
if task.Params.Contains(key) {
quotaJson, err := task.Params.Get(key)
if err != nil {
return errors.Wrapf(err, "task.Params.Get %s", key)
}
err = quotaJson.Unmarshal(quota)
if err != nil {
return errors.Wrap(err, "quotaJson.Unmarshal")
}
}
return nil
}
func pendingUsageKey(index int) string {
key := PENDING_USAGE_KEY
if index > 0 {
key += "." + strconv.FormatInt(int64(index), 10)
}
return key
}
func (task *STask) SetPendingUsage(quota quotas.IQuota, index int) error {
_, err := db.Update(task, func() error {
key := pendingUsageKey(index)
params := task.Params.CopyExcludes(key)
params.Add(jsonutils.Marshal(quota), key)
task.Params = params
return nil
})
if err != nil {
log.Errorf("set_pending_usage fail %s", err)
}
return err
}
func (task *STask) ClearPendingUsage(index int) error {
_, err := db.Update(task, func() error {
key := pendingUsageKey(index)
params := task.Params.CopyExcludes(key)
task.Params = params
return nil
})
if err != nil {
log.Errorf("clear_pending_usage fail %s", err)
}
return err
}
func (task *STask) GetParams() *jsonutils.JSONDict {
return task.Params
}
func (task *STask) GetUserCred() mcclient.TokenCredential {
return task.UserCred
}
func (task *STask) GetTaskId() string {
return task.GetId()
}
func (task *STask) GetObject() db.IStandaloneModel {
return task.taskObject
}
func (task *STask) GetObjects() []db.IStandaloneModel {
return task.taskObjects
}
func (task *STask) getTaskHeader() http.Header {
userCred := task.GetUserCred()
if !userCred.IsValid() {
userCred = auth.AdminCredential()
}
header := mcclient.GetTokenHeaders(userCred)
header.Set(mcclient.TASK_ID, task.GetTaskId())
return header
}
func (task *STask) GetTaskRequestHeader() http.Header {
header := task.getTaskHeader()
if len(serviceUrl) > 0 {
notifyUrl := fmt.Sprintf("%s/tasks/%s", serviceUrl, task.GetTaskId())
header.Set(mcclient.TASK_NOTIFY_URL, notifyUrl)
}
return header
}
func (task *STask) String() string {
return fmt.Sprintf("%s(%s,%s)", task.Id, task.TaskName, task.Stage)
}
var serviceUrl string
func SetServiceUrl(url string) {
serviceUrl = url
}
func (task *STask) GetStartTime() time.Time {
return task.CreatedAt
}
func (manager *STaskManager) QueryTasksOfObject(obj db.IStandaloneModel, since time.Time, isOpen *bool) *sqlchemy.SQuery {
subq1 := manager.Query()
{
subq1 = subq1.Equals("obj_id", obj.GetId())
subq1 = subq1.Equals("obj_type", obj.Keyword())
if !since.IsZero() {
subq1 = subq1.GE("created_at", since)
}
if isOpen != nil {
if *isOpen {
subq1 = subq1.Filter(sqlchemy.NOT(
sqlchemy.In(subq1.Field("stage"), []string{"complete", "failed"}),
))
} else if !*isOpen {
subq1 = subq1.In("stage", []string{"complete", "failed"})
}
}
}
subq2 := manager.Query()
{
taskObjs := TaskObjectManager.TableSpec().Instance()
subq2 = subq2.Join(taskObjs, sqlchemy.AND(
sqlchemy.Equals(taskObjs.Field("task_id"), subq2.Field("id")),
sqlchemy.Equals(taskObjs.Field("obj_id"), obj.GetId()),
))
subq2 = subq2.Filter(sqlchemy.Equals(subq2.Field("obj_id"), MULTI_OBJECTS_ID))
subq2 = subq2.Filter(sqlchemy.Equals(subq2.Field("obj_type"), obj.Keyword()))
if !since.IsZero() {
subq2 = subq2.Filter(sqlchemy.GE(subq2.Field("created_at"), since))
}
if isOpen != nil {
if *isOpen {
subq2 = subq2.Filter(sqlchemy.NOT(
sqlchemy.In(subq2.Field("stage"), []string{"complete", "failed"}),
))
} else if !*isOpen {
subq2 = subq2.In("stage", []string{"complete", "failed"})
}
}
}
// subq1 and subq2 do not overlap for the fact that they have
// different conditions on tasks_tbl.obj_id field
return sqlchemy.Union(subq1, subq2).Query().Desc("created_at")
}
func (manager *STaskManager) IsInTask(obj db.IStandaloneModel) bool {
tasks, err := manager.FetchIncompleteTasksOfObject(obj)
if err == nil && len(tasks) == 0 {
return false
}
return true
}
func (manager *STaskManager) FetchIncompleteTasksOfObject(obj db.IStandaloneModel) ([]STask, error) {
isOpen := true
return manager.FetchTasksOfObjectLatest(obj, 1*time.Hour, &isOpen)
}
func (manager *STaskManager) FetchTasksOfObjectLatest(obj db.IStandaloneModel, interval time.Duration, isOpen *bool) ([]STask, error) {
since := timeutils.UtcNow().Add(-1 * interval)
return manager.FetchTasksOfObject(obj, since, isOpen)
}
func (manager *STaskManager) FetchTasksOfObject(obj db.IStandaloneModel, since time.Time, isOpen *bool) ([]STask, error) {
q := manager.QueryTasksOfObject(obj, since, isOpen)
tasks := make([]STask, 0)
err := db.FetchModelObjects(manager, q, &tasks)
if err != nil && err != sql.ErrNoRows {
return nil, err
}
return tasks, nil
}
// 操作日志列表
func (manager *STaskManager) ListItemFilter(
ctx context.Context,
q *sqlchemy.SQuery,
userCred mcclient.TokenCredential,
input apis.TaskListInput,
) (*sqlchemy.SQuery, error) {
var err error
q, err = manager.SModelBaseManager.ListItemFilter(ctx, q, userCred, input.ModelBaseListInput)
if err != nil {
return q, errors.Wrap(err, "SResourceBaseManager.ListItemFilter")
}
q, err = manager.SStatusResourceBaseManager.ListItemFilter(ctx, q, userCred, input.StatusResourceBaseListInput)
if err != nil {
return q, errors.Wrap(err, "SStatusResourceBaseManager.ListItemFilter")
}
if len(input.Id) > 0 {
q = q.In("id", input.Id)
}
if len(input.ObjId) > 0 {
taskObjQ := TaskObjectManager.Query("task_id").In("obj_id", input.ObjId).Distinct().SubQuery()
q = q.Join(taskObjQ, sqlchemy.Equals(q.Field("id"), taskObjQ.Field("task_id")))
}
if len(input.ObjName) > 0 {
q = q.In("object", input.ObjName)
}
if len(input.ObjType) > 0 {
q = q.In("obj_type", input.ObjType)
}
if len(input.TaskName) > 0 {
q = q.In("task_name", input.TaskName)
}
if len(input.Stage) > 0 {
q = q.In("stage", input.Stage)
}
if len(input.NotStage) > 0 {
q = q.NotIn("stage", input.NotStage)
}
if len(input.ParentId) > 0 {
q = q.In("parent_task_id", input.ParentId)
}
if input.IsComplete != nil {
if *input.IsComplete {
q = q.In("stage", []string{TASK_STAGE_FAILED, TASK_STAGE_COMPLETE})
} else {
q = q.NotIn("stage", []string{TASK_STAGE_FAILED, TASK_STAGE_COMPLETE})
}
}
if input.IsInit != nil {
if *input.IsInit {
q = q.Equals("stage", TASK_INIT_STAGE)
} else {
q = q.NotEquals("stage", TASK_INIT_STAGE)
}
}
if input.IsMulti != nil {
if *input.IsMulti {
q = q.Equals("obj_id", MULTI_OBJECTS_ID)
} else {
q = q.NotEquals("obj_id", MULTI_OBJECTS_ID)
}
}
if input.IsRoot != nil {
if *input.IsRoot {
q = q.IsNullOrEmpty("parent_task_id")
} else {
q = q.IsNotEmpty("parent_task_id")
}
}
if len(input.ParentTaskId) > 0 {
q = q.Equals("parent_task_id", input.ParentTaskId)
}
if input.SubTask != nil && *input.SubTask {
subSQFunc := func(status string, cntField string) *sqlchemy.SSubQuery {
subQ := SubTaskManager.Query()
if len(status) > 0 {
subQ = subQ.Equals("status", status)
}
subQ = subQ.GroupBy(subQ.Field("task_id"))
subQ = subQ.AppendField(subQ.Field("task_id"))
subQ = subQ.AppendField(sqlchemy.COUNT(cntField))
return subQ.SubQuery()
}
{
subSQ := subSQFunc("", "sub_task_count")
q = q.LeftJoin(subSQ, sqlchemy.Equals(subSQ.Field("task_id"), q.Field("id")))
q = q.AppendField(subSQ.Field("sub_task_count"))
}
{
failSubSQ := subSQFunc(SUBTASK_FAIL, "fail_sub_task_cnt")
q = q.LeftJoin(failSubSQ, sqlchemy.Equals(failSubSQ.Field("task_id"), q.Field("id")))
q = q.AppendField(failSubSQ.Field("fail_sub_task_cnt"))
}
{
succSubSQ := subSQFunc(SUBTASK_SUCC, "succ_sub_task_cnt")
q = q.LeftJoin(succSubSQ, sqlchemy.Equals(succSubSQ.Field("task_id"), q.Field("id")))
q = q.AppendField(succSubSQ.Field("succ_sub_task_cnt"))
}
for _, c := range manager.TableSpec().Columns() {
q = q.AppendField(q.Field(c.Name()))
}
}
// q.DebugQuery2("taskQuery")
return q, nil
}
func (manager *STaskManager) ListItemExportKeys(ctx context.Context, q *sqlchemy.SQuery, userCred mcclient.TokenCredential, keys stringutils2.SSortedStrings) (*sqlchemy.SQuery, error) {
var err error
q, err = manager.SModelBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
if err != nil {
return nil, errors.Wrap(err, "SModelBaseManager.ListItemExportKeys")
}
// q, err = manager.SProjectizedResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
// if err != nil {
// return nil, errors.Wrap(err, "SProjectizedResourceBaseManager.ListItemExportKeys")
// }
return q, nil
}
func (manager *STaskManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
var err error
q, err = manager.SModelBaseManager.QueryDistinctExtraField(q, field)
if err == nil {
return q, nil
}
// q, err = manager.SProjectizedResourceBaseManager.QueryDistinctExtraField(q, field)
// if err == nil {
// return q, nil
// }
return q, httperrors.ErrNotFound
}
func (manager *STaskManager) ResourceScope() rbacscope.TRbacScope {
return manager.SProjectizedResourceBaseManager.ResourceScope()
}
func (manager *STaskManager) FetchCustomizeColumns(
ctx context.Context,
userCred mcclient.TokenCredential,
query jsonutils.JSONObject,
objs []interface{},
fields stringutils2.SSortedStrings,
isList bool,
) []apis.TaskDetails {
rows := make([]apis.TaskDetails, len(objs))
bases := manager.SModelBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
projs := manager.SProjectizedResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
for i := range objs {
rows[i] = apis.TaskDetails{
ModelBaseDetails: bases[i],
ProjectizedResourceInfo: projs[i],
}
}
return rows
}
func (manager *STaskManager) OrderByExtraFields(
ctx context.Context,
q *sqlchemy.SQuery,
userCred mcclient.TokenCredential,
query apis.TaskListInput,
) (*sqlchemy.SQuery, error) {
var err error
q, err = manager.SModelBaseManager.OrderByExtraFields(ctx, q, userCred, query.ModelBaseListInput)
if err != nil {
return q, errors.Wrap(err, "SModelBaseManager.OrderByExtraField")
}
// q, err = manager.SProjectizedResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.ProjectizedResourceListInput)
// if err != nil {
// return q, errors.Wrap(err, "SProjectizedResourceBaseManager.OrderByExtraField")
// }
return q, nil
}
func (task *STask) SetProgressAndStatus(progress float32, status string) error {
_, err := db.Update(task, func() error {
task.SetStatusValue(status)
task.SetProgressValue(progress)
return nil
})
if err != nil {
return errors.Wrap(err, "Update")
}
return nil
}
func (task *STask) SetProgress(progress float32) error {
_, err := db.Update(task, func() error {
task.SetProgressValue(progress)
return nil
})
if err != nil {
return errors.Wrap(err, "Update")
}
return nil
}
func (manager *STaskManager) InitializeData() error {
{
err := manager.failTimeoutTasks()
if err != nil {
return errors.Wrap(err, "failTimeoutTasks")
}
}
{
err := manager.migrateObjectInfo()
if err != nil {
return errors.Wrap(err, "migrateObjectInfo")
}
}
{
err := manager.clearnUpSubtasks()
if err != nil {
return errors.Wrap(err, "clearnUpSubtasks")
}
}
{
err := manager.clearnUpTaskObjects()
if err != nil {
return errors.Wrap(err, "clearnUpTaskObjects")
}
}
return nil
}
func (manager *STaskManager) failTimeoutTasks() error {
q := manager.Query().NotIn("stage", []string{TASK_STAGE_FAILED, TASK_STAGE_COMPLETE})
tasks := make([]STask, 0)
err := db.FetchModelObjects(manager, q, &tasks)
if err != nil {
return errors.Wrap(err, "FetchModelObjects")
}
reason := jsonutils.NewDict()
reason.Add(jsonutils.NewString("service restart"), "__reason__")
reason.Add(jsonutils.NewString("error"), "__status__")
for i := range tasks {
task := &tasks[i]
task.fixParams()
manager.execTaskObject(task, reason)
}
return nil
}
func (manager *STaskManager) clearnUpSubtasks() error {
start := time.Now()
log.Infof("start clearnUpSubtasks")
defer func() {
log.Infof("end clearnUpSubtasks, takes %f seconds", time.Since(start).Seconds())
}()
q := SubTaskManager.Query("task_id")
tasksQ := TaskManager.Query().SubQuery()
q = q.LeftJoin(tasksQ, sqlchemy.Equals(q.Field("task_id"), tasksQ.Field("id")))
q = q.Filter(sqlchemy.IsNull(tasksQ.Field("id")))
q = q.Distinct()
rows, err := q.Rows()
if err != nil {
return errors.Wrap(err, "q.Rows")
}
defer rows.Close()
for rows.Next() {
val, err := q.Row2Map(rows)
if err != nil {
return errors.Wrap(err, "Row2Map")
}
sql := fmt.Sprintf("delete from `%s` where task_id = '%s'", SubTaskManager.TableSpec().Name(), val["task_id"])
log.Infof("%s", sql)
_, err = SubTaskManager.TableSpec().GetTableSpec().Database().Exec(sql)
if err != nil {
return errors.Wrap(err, "exec")
}
sql = fmt.Sprintf("delete from `%s` where task_id = '%s'", TaskObjectManager.TableSpec().Name(), val["task_id"])
log.Infof("%s", sql)
_, err = TaskObjectManager.TableSpec().GetTableSpec().Database().Exec(sql)
if err != nil {
return errors.Wrap(err, "exec")
}
}
return nil
}
func (manager *STaskManager) clearnUpTaskObjects() error {
start := time.Now()
log.Infof("start clearnUpTaskObjects")
defer func() {
log.Infof("end clearnUpTaskObjects, takes %f seconds", time.Since(start).Seconds())
}()
q := TaskObjectManager.Query("task_id")
tasksQ := TaskManager.Query().SubQuery()
q = q.LeftJoin(tasksQ, sqlchemy.Equals(q.Field("task_id"), tasksQ.Field("id")))
q = q.Filter(sqlchemy.IsNull(tasksQ.Field("id")))
q = q.Distinct()
rows, err := q.Rows()
if err != nil {
return errors.Wrap(err, "q.Rows")
}
defer rows.Close()
for rows.Next() {
val, err := q.Row2Map(rows)
if err != nil {
return errors.Wrap(err, "Row2Map")
}
sql := fmt.Sprintf("delete from `%s` where task_id = '%s'", TaskObjectManager.TableSpec().Name(), val["task_id"])
log.Infof("%s", sql)
_, err = TaskObjectManager.TableSpec().GetTableSpec().Database().Exec(sql)
if err != nil {
return errors.Wrap(err, "exec")
}
}
return nil
}
func (manager *STaskManager) migrateObjectInfo() error {
start := time.Now()
log.Infof("start migrateObjectInfo")
defer func() {
log.Infof("end migrateObjectInfo, takes %f seconds", time.Since(start).Seconds())
}()
q := manager.Query().NotEquals("obj_id", MULTI_OBJECTS_ID)
taskObj := TaskObjectManager.Query().SubQuery()
q = q.LeftJoin(taskObj, sqlchemy.Equals(q.Field("id"), taskObj.Field("task_id")))
q = q.Filter(sqlchemy.IsNull(taskObj.Field("task_id")))
q = q.Asc("created_at")
// q.DebugQuery2("migrateObjectInfo")
rows, err := q.Rows()
if err != nil {
return errors.Wrap(err, "query.Rows")
}
defer rows.Close()
for rows.Next() {
task := STask{}
task.SetModelManager(manager, &task)
err := q.Row2Struct(rows, &task)
if err != nil {
return errors.Wrap(err, "row2struct")
}
taskObj := STaskObject{}
taskObj.ObjId = task.ObjId
taskObj.Object = task.Object
taskObj.TaskId = task.Id
taskObj.DomainId = task.DomainId
taskObj.ProjectId = task.ProjectId
taskObj.SetModelManager(TaskObjectManager, &taskObj)
err = TaskObjectManager.TableSpec().Insert(ctx.CtxWithTime(), &taskObj)
if err != nil {
return errors.Wrap(err, "Insert taskObject")
}
}
return nil
}
var (
taskCleanuoWorkerManager = appsrv.NewWorkerManager(
"taskCleanupWorkerManager",
1,
1024,
true,
)
)
func (manager *STaskManager) TaskCleanupJob(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
taskCleanuoWorkerManager.Run(&sTaskCleanupJob{}, nil, nil)
}
type sTaskCleanupJob struct{}
func (job *sTaskCleanupJob) Run() {
count, err := TaskManager.doTaskCleanupJob()
if err != nil {
log.Errorf("doTaskCleanupJob fail %s", err)
return
}
if count > 0 {
taskCleanuoWorkerManager.Run(&sTaskCleanupJob{}, nil, nil)
}
}
func (job *sTaskCleanupJob) Dump() string {
return "TaskCleanupJob"
}
func (manager *STaskManager) doTaskCleanupJob() (int, error) {
ctx := context.WithValue(context.Background(), "task_cleanup_job", true)
q := manager.Query().LT("created_at", time.Now().Add(-time.Duration(consts.TaskArchiveThresholdHours())*time.Hour)).Asc("created_at")
if consts.TaskArchiveBatchLimit() > 0 {
q = q.Limit(consts.TaskArchiveBatchLimit())
}
rows, err := q.Rows()
if err != nil {
log.Errorf("query rows fail %s", err)
return 0, errors.Wrap(err, "query rows")
}
defer rows.Close()
taskStart := time.Now()
count := 0
for rows.Next() {
task := STask{}
err := q.Row2Struct(rows, &task)
if err != nil {
log.Errorf("Row2Struct fail %s", err)
return 0, errors.Wrap(err, "row2struct")
}
task.SetModelManager(ArchivedTaskManager, &task)
err = ArchivedTaskManager.Insert(ctx, &task)
if err != nil {
log.Errorf("insert archive fail %s", err)
return 0, errors.Wrap(err, "insert archive")
}
// cleanup
for _, sql := range []string{
fmt.Sprintf("DELETE FROM `%s` WHERE id = ?", manager.TableSpec().Name()),
fmt.Sprintf("DELETE FROM `%s` WHERE task_id = ?", TaskObjectManager.TableSpec().Name()),
fmt.Sprintf("DELETE FROM `%s` WHERE task_id = ?", SubTaskManager.TableSpec().Name()),
} {
_, err := manager.TableSpec().GetTableSpec().Database().Exec(sql, task.Id)
if err != nil {
log.Errorf("exec %s %s fail: %s", sql, task.Id, err)
return 0, errors.Wrap(err, "exec")
}
}
count++
}
log.Infof("TaskCleanupJob migrate %d tasks, takes %f seconds, batch limit=%d threshold hours=%d", count, time.Since(taskStart).Seconds(), consts.TaskArchiveBatchLimit(), consts.TaskArchiveThresholdHours())
return count, nil
}
func (task *STask) PerformCancel(
ctx context.Context,
userCred mcclient.TokenCredential,
query jsonutils.JSONObject,
input apis.TaskCancelInput,
) (jsonutils.JSONObject, error) {
if utils.IsInArray(task.Stage, []string{TASK_STAGE_FAILED, TASK_STAGE_COMPLETE}) {
return nil, errors.Wrapf(errors.ErrInvalidStatus, "cannot cancel stage in %s", task.Stage)
}
err := task.cancel(ctx, nil)
if err != nil {
return nil, errors.Wrap(err, "cancel")
}
return nil, nil
}
func (task *STask) fetchSubTasks() ([]STask, error) {
q := task.GetModelManager().Query().Equals("parent_task_id", task.Id)
tasks := make([]STask, 0)
err := db.FetchModelObjects(task.GetModelManager(), q, &tasks)
if err != nil {
return nil, errors.Wrap(err, "db.FetchModelObjects")
}
return tasks, nil
}
func (task *STask) cancel(ctx context.Context, reason *jsonutils.JSONDict) error {
if utils.IsInArray(task.Stage, []string{TASK_STAGE_FAILED, TASK_STAGE_COMPLETE}) {
return nil
}
if reason == nil {
reason = jsonutils.NewDict()
reason.Add(jsonutils.NewString("cancel"), "__reason__")
reason.Add(jsonutils.NewString("error"), "__status__")
}
subtasks, err := task.fetchSubTasks()
if err != nil {
return errors.Wrap(err, "fetchSubTasks")
}
for i := range subtasks {
err := subtasks[i].cancel(ctx, reason)
if err != nil {
return errors.Wrap(err, "cancelTask")
}
}
task.fixParams()
TaskManager.execTask(task.GetTaskId(), reason)
return nil
}