diff --git a/pkg/cloudcommon/db/taskman/coordinator.go b/pkg/cloudcommon/db/taskman/coordinator.go index a4e03744ea..2bed1a559b 100644 --- a/pkg/cloudcommon/db/taskman/coordinator.go +++ b/pkg/cloudcommon/db/taskman/coordinator.go @@ -8,6 +8,7 @@ import ( "yunion.io/x/log" "yunion.io/x/pkg/gotypes" + "yunion.io/x/onecloud/pkg/appsrv" "yunion.io/x/onecloud/pkg/cloudcommon/db" ) @@ -37,7 +38,7 @@ func init() { taskTable = make(map[string]reflect.Type) } -func RegisterTask(task interface{}) { +func RegisterTaskAndWorker(task interface{}, workerMan *appsrv.SWorkerManager) { taskName := gotypes.GetInstanceTypeName(task) if _, ok := taskTable[taskName]; ok { log.Fatalf("Task %s already registered!", taskName) @@ -45,6 +46,13 @@ func RegisterTask(task interface{}) { taskType := reflect.Indirect(reflect.ValueOf(task)).Type() taskTable[taskName] = taskType // log.Infof("Task %s registerd", taskName) + if workerMan != nil { + taskWorkerTable[taskName] = workerMan + } +} + +func RegisterTask(task interface{}) { + RegisterTaskAndWorker(task, nil) } func isTaskExist(taskName string) bool { diff --git a/pkg/cloudcommon/db/taskman/handler.go b/pkg/cloudcommon/db/taskman/handler.go index 479d0234fe..37c5ebfeba 100644 --- a/pkg/cloudcommon/db/taskman/handler.go +++ b/pkg/cloudcommon/db/taskman/handler.go @@ -1,26 +1,12 @@ package taskman import ( - "yunion.io/x/jsonutils" "yunion.io/x/onecloud/pkg/appsrv" "yunion.io/x/onecloud/pkg/appsrv/dispatcher" - "yunion.io/x/onecloud/pkg/cloudcommon/db" ) -var taskWorkMan *appsrv.SWorkerManager - -func init() { - taskWorkMan = appsrv.NewWorkerManager("TaskWorkerManager", 4, 1024, true) -} - func AddTaskHandler(prefix string, app *appsrv.Application) { handler := db.NewModelHandler(TaskManager) dispatcher.AddModelDispatcher(prefix, app, handler) } - -func runTask(taskId string, data jsonutils.JSONObject) { - taskWorkMan.Run(func() { - TaskManager.execTask(taskId, data) - }, nil, nil) -} diff --git a/pkg/cloudcommon/db/taskman/tasks.go b/pkg/cloudcommon/db/taskman/tasks.go index 26d5952e28..eaafcd9406 100644 --- a/pkg/cloudcommon/db/taskman/tasks.go +++ b/pkg/cloudcommon/db/taskman/tasks.go @@ -287,6 +287,14 @@ func (manager *STaskManager) fetchTask(idStr string) *STask { return task } +func (manager *STaskManager) getTaskName(taskId string) string { + baseTask := manager.fetchTask(taskId) + if baseTask == nil { + return "" + } + return baseTask.TaskName +} + func (manager *STaskManager) execTask(taskId string, data jsonutils.JSONObject) { baseTask := manager.fetchTask(taskId) if baseTask == nil { diff --git a/pkg/cloudcommon/db/taskman/worker.go b/pkg/cloudcommon/db/taskman/worker.go new file mode 100644 index 0000000000..7229ba2ddd --- /dev/null +++ b/pkg/cloudcommon/db/taskman/worker.go @@ -0,0 +1,31 @@ +package taskman + +import ( + "yunion.io/x/jsonutils" + "yunion.io/x/log" + + "yunion.io/x/onecloud/pkg/appsrv" +) + +var taskWorkMan *appsrv.SWorkerManager +var taskWorkerTable map[string]*appsrv.SWorkerManager + +func init() { + taskWorkMan = appsrv.NewWorkerManager("TaskWorkerManager", 4, 1024, true) + taskWorkerTable = make(map[string]*appsrv.SWorkerManager) +} + +func runTask(taskId string, data jsonutils.JSONObject) { + taskName := TaskManager.getTaskName(taskId) + if len(taskName) == 0 { + log.Errorf("no such task??? task_id=%s", taskId) + return + } + worker := taskWorkMan + if workerMan, ok := taskWorkerTable[taskName]; ok { + worker = workerMan + } + worker.Run(func() { + TaskManager.execTask(taskId, data) + }, nil, nil) +} diff --git a/pkg/compute/tasks/cloud_provider_sync_info_task.go b/pkg/compute/tasks/cloud_provider_sync_info_task.go index b69e3a000a..7fcaaa986f 100644 --- a/pkg/compute/tasks/cloud_provider_sync_info_task.go +++ b/pkg/compute/tasks/cloud_provider_sync_info_task.go @@ -6,13 +6,15 @@ import ( "yunion.io/x/jsonutils" "yunion.io/x/log" + "yunion.io/x/pkg/utils" + + "yunion.io/x/onecloud/pkg/appsrv" "yunion.io/x/onecloud/pkg/cloudcommon/db" "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman" "yunion.io/x/onecloud/pkg/cloudprovider" "yunion.io/x/onecloud/pkg/compute/models" "yunion.io/x/onecloud/pkg/compute/skus" "yunion.io/x/onecloud/pkg/util/logclient" - "yunion.io/x/pkg/utils" ) type CloudProviderSyncInfoTask struct { @@ -20,7 +22,8 @@ type CloudProviderSyncInfoTask struct { } func init() { - taskman.RegisterTask(CloudProviderSyncInfoTask{}) + syncWorker := appsrv.NewWorkerManager("CloudProviderSyncInfoTaskWorkerManager", 2, 512, true) + taskman.RegisterTaskAndWorker(CloudProviderSyncInfoTask{}, syncWorker) } func getAction(params *jsonutils.JSONDict) string { diff --git a/pkg/compute/tasks/guest_start_task.go b/pkg/compute/tasks/guest_start_task.go index f981746c35..899aec02c7 100644 --- a/pkg/compute/tasks/guest_start_task.go +++ b/pkg/compute/tasks/guest_start_task.go @@ -25,7 +25,7 @@ func (self *GuestStartTask) OnInit(ctx context.Context, obj db.IStandaloneModel, } func (self *GuestStartTask) checkTemplate(ctx context.Context, guest *models.SGuest) { - diskCat := guest.CategorizeDisks() + /*diskCat := guest.CategorizeDisks() if diskCat.Root != nil && len(diskCat.Root.GetTemplateId()) > 0 { if len(guest.BackupHostId) > 0 { self.SetStage("OnMasterHostTemplateReady", nil) @@ -35,7 +35,8 @@ func (self *GuestStartTask) checkTemplate(ctx context.Context, guest *models.SGu guest.GetDriver().CheckDiskTemplateOnStorage(ctx, self.UserCred, diskCat.Root.GetTemplateId(), diskCat.Root.StorageId, self) } else { self.startStart(ctx, guest) - } + }*/ + self.startStart(ctx, guest) } func (self *GuestStartTask) OnMasterHostTemplateReady(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) { diff --git a/pkg/image/tasks/image_check_task.go b/pkg/image/tasks/image_check_task.go index f39b0f5e33..c0b103a031 100644 --- a/pkg/image/tasks/image_check_task.go +++ b/pkg/image/tasks/image_check_task.go @@ -5,6 +5,7 @@ import ( "yunion.io/x/jsonutils" + "yunion.io/x/onecloud/pkg/appsrv" "yunion.io/x/onecloud/pkg/cloudcommon/db" "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman" "yunion.io/x/onecloud/pkg/image/models" @@ -15,7 +16,8 @@ type ImageCheckTask struct { } func init() { - taskman.RegisterTask(ImageCheckTask{}) + checkWorker := appsrv.NewWorkerManager("ImageCheckTaskWorkerManager", 2, 1024, true) + taskman.RegisterTaskAndWorker(ImageCheckTask{}, checkWorker) } func (self *ImageCheckTask) OnInit(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) { diff --git a/pkg/image/tasks/image_convert_task.go b/pkg/image/tasks/image_convert_task.go index 8e95736c79..af3271a235 100644 --- a/pkg/image/tasks/image_convert_task.go +++ b/pkg/image/tasks/image_convert_task.go @@ -5,6 +5,7 @@ import ( "yunion.io/x/jsonutils" + "yunion.io/x/onecloud/pkg/appsrv" "yunion.io/x/onecloud/pkg/cloudcommon/db" "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman" "yunion.io/x/onecloud/pkg/image/models" @@ -15,7 +16,8 @@ type ImageConvertTask struct { } func init() { - taskman.RegisterTask(ImageConvertTask{}) + convertWorker := appsrv.NewWorkerManager("ImageConvertTaskWorkerManager", 2, 512, true) + taskman.RegisterTaskAndWorker(ImageConvertTask{}, convertWorker) } func (self *ImageConvertTask) OnInit(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {