Automatic merge from release/2.0.0 -> release/2.1.0

* commit 'a5dfd52413ab09c3842320a210badfeee56d703b':
  修复parallel task未初始化subtask
This commit is contained in:
邱剑
2018-10-17 16:58:53 +08:00
3 changed files with 36 additions and 13 deletions

View File

@@ -11,7 +11,7 @@ import (
var taskWorkMan *appsrv.WorkerManager
func init() {
taskWorkMan = appsrv.NewWorkerManager("TaskWorkerManager", 4, 10)
taskWorkMan = appsrv.NewWorkerManager("TaskWorkerManager", 4, 100)
}
func AddTaskHandler(prefix string, app *appsrv.Application) {

View File

@@ -37,6 +37,7 @@ type SSubTask struct {
func (manager *SSubTaskmanager) GetSubTask(ptaskId string, subtaskId string) *SSubTask {
subtask := SSubTask{}
subtask.SetModelManager(manager)
err := manager.Query().Equals("task_id", ptaskId).Equals("subtask_id", subtaskId).First(&subtask)
if err != nil {
if err != sql.ErrNoRows {
@@ -47,9 +48,12 @@ func (manager *SSubTaskmanager) GetSubTask(ptaskId string, subtaskId string) *SS
return &subtask
}
func (manager *SSubTaskmanager) GetInitSubtasks(taskId string, stage string) []SSubTask {
func (manager *SSubTaskmanager) GetTotalSubtasks(taskId string, stage string, status string) []SSubTask {
subtasks := make([]SSubTask, 0)
q := manager.Query().Equals("task_id", taskId).Equals("stage", stage).Equals("status", SUBTASK_INIT)
q := manager.Query().Equals("task_id", taskId).Equals("stage", stage)
if len(status) > 0 {
q = q.Equals("status", status)
}
err := db.FetchModelObjects(manager, q, &subtasks)
if err != nil {
log.Errorf("GetInitSubtasks fail %s", err)
@@ -58,6 +62,10 @@ func (manager *SSubTaskmanager) GetInitSubtasks(taskId string, stage string) []S
return subtasks
}
func (manager *SSubTaskmanager) GetInitSubtasks(taskId string, stage string) []SSubTask {
return manager.GetTotalSubtasks(taskId, stage, SUBTASK_INIT)
}
func (self *SSubTask) SaveResults(failed bool, result jsonutils.JSONObject) error {
_, err := self.GetModelManager().TableSpec().Update(self, func() error {
if failed {

View File

@@ -196,6 +196,15 @@ func (manager *STaskManager) NewTask(ctx context.Context, taskName string, obj d
log.Errorf("Task insert error %s", err)
return nil, err
}
parentTask := task.GetParentTask()
if parentTask != nil {
st := SSubTask{TaskId: parentTask.Id, Stage: parentTask.Stage, SubtaskId: task.Id}
err := SubTaskManager.TableSpec().Insert(&st)
if err != nil {
log.Errorf("Subtask insert error %s", err)
return nil, err
}
}
return &task, nil
}
@@ -516,14 +525,19 @@ func (self *STask) NotifyParentTaskComplete(ctx context.Context, body *jsonutils
if subTask != nil {
subTask.SaveResults(failed, body)
}
pTask := TaskManager.fetchTask(parentTaskId)
if pTask == nil {
log.Errorf("Parent task %s not found", parentTaskId)
return
}
if pTask.IsCurrentStageComplete() {
pTask.ScheduleRun(body)
}
func() {
lockman.LockRawObject(ctx, "tasks", parentTaskId)
defer lockman.ReleaseRawObject(ctx, "tasks", parentTaskId)
pTask := TaskManager.fetchTask(parentTaskId)
if pTask == nil {
log.Errorf("Parent task %s not found", parentTaskId)
return
}
if pTask.IsCurrentStageComplete() {
pTask.ScheduleRun(body)
}
}()
}
if len(parentTaskNotify) > 0 {
notifyRemoteTask(ctx, parentTaskNotify, parentTaskId, body, 0)
@@ -560,8 +574,9 @@ func (self *STask) NotifyParentTaskFailure(ctx context.Context, reason string) {
}
func (self *STask) IsCurrentStageComplete() bool {
subtasks := SubTaskManager.GetInitSubtasks(self.Id, self.Stage)
if len(subtasks) == 0 {
totalSubtasks := SubTaskManager.GetTotalSubtasks(self.Id, self.Stage, "")
initSubtasks := SubTaskManager.GetInitSubtasks(self.Id, self.Stage)
if len(totalSubtasks) > 0 && len(initSubtasks) == 0 {
return true
} else {
return false