use separate worker for long running tasks, such as ImageConvertTask,

ImageCheckTask, CloudProviderSyncInfoTask
This commit is contained in:
Qiu Jian
2018-12-27 16:19:24 +08:00
parent a99ded14e3
commit df41b7cdcd
8 changed files with 62 additions and 21 deletions

View File

@@ -8,6 +8,7 @@ import (
"yunion.io/x/log"
"yunion.io/x/pkg/gotypes"
"yunion.io/x/onecloud/pkg/appsrv"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
)
@@ -37,7 +38,7 @@ func init() {
taskTable = make(map[string]reflect.Type)
}
func RegisterTask(task interface{}) {
func RegisterTaskAndWorker(task interface{}, workerMan *appsrv.SWorkerManager) {
taskName := gotypes.GetInstanceTypeName(task)
if _, ok := taskTable[taskName]; ok {
log.Fatalf("Task %s already registered!", taskName)
@@ -45,6 +46,13 @@ func RegisterTask(task interface{}) {
taskType := reflect.Indirect(reflect.ValueOf(task)).Type()
taskTable[taskName] = taskType
// log.Infof("Task %s registerd", taskName)
if workerMan != nil {
taskWorkerTable[taskName] = workerMan
}
}
func RegisterTask(task interface{}) {
RegisterTaskAndWorker(task, nil)
}
func isTaskExist(taskName string) bool {

View File

@@ -1,26 +1,12 @@
package taskman
import (
"yunion.io/x/jsonutils"
"yunion.io/x/onecloud/pkg/appsrv"
"yunion.io/x/onecloud/pkg/appsrv/dispatcher"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
)
var taskWorkMan *appsrv.SWorkerManager
func init() {
taskWorkMan = appsrv.NewWorkerManager("TaskWorkerManager", 4, 1024, true)
}
func AddTaskHandler(prefix string, app *appsrv.Application) {
handler := db.NewModelHandler(TaskManager)
dispatcher.AddModelDispatcher(prefix, app, handler)
}
func runTask(taskId string, data jsonutils.JSONObject) {
taskWorkMan.Run(func() {
TaskManager.execTask(taskId, data)
}, nil, nil)
}

View File

@@ -287,6 +287,14 @@ func (manager *STaskManager) fetchTask(idStr string) *STask {
return task
}
func (manager *STaskManager) getTaskName(taskId string) string {
baseTask := manager.fetchTask(taskId)
if baseTask == nil {
return ""
}
return baseTask.TaskName
}
func (manager *STaskManager) execTask(taskId string, data jsonutils.JSONObject) {
baseTask := manager.fetchTask(taskId)
if baseTask == nil {

View File

@@ -0,0 +1,31 @@
package taskman
import (
"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/onecloud/pkg/appsrv"
)
var taskWorkMan *appsrv.SWorkerManager
var taskWorkerTable map[string]*appsrv.SWorkerManager
func init() {
taskWorkMan = appsrv.NewWorkerManager("TaskWorkerManager", 4, 1024, true)
taskWorkerTable = make(map[string]*appsrv.SWorkerManager)
}
func runTask(taskId string, data jsonutils.JSONObject) {
taskName := TaskManager.getTaskName(taskId)
if len(taskName) == 0 {
log.Errorf("no such task??? task_id=%s", taskId)
return
}
worker := taskWorkMan
if workerMan, ok := taskWorkerTable[taskName]; ok {
worker = workerMan
}
worker.Run(func() {
TaskManager.execTask(taskId, data)
}, nil, nil)
}

View File

@@ -6,13 +6,15 @@ import (
"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/utils"
"yunion.io/x/onecloud/pkg/appsrv"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
"yunion.io/x/onecloud/pkg/cloudprovider"
"yunion.io/x/onecloud/pkg/compute/models"
"yunion.io/x/onecloud/pkg/compute/skus"
"yunion.io/x/onecloud/pkg/util/logclient"
"yunion.io/x/pkg/utils"
)
type CloudProviderSyncInfoTask struct {
@@ -20,7 +22,8 @@ type CloudProviderSyncInfoTask struct {
}
func init() {
taskman.RegisterTask(CloudProviderSyncInfoTask{})
syncWorker := appsrv.NewWorkerManager("CloudProviderSyncInfoTaskWorkerManager", 2, 512, true)
taskman.RegisterTaskAndWorker(CloudProviderSyncInfoTask{}, syncWorker)
}
func getAction(params *jsonutils.JSONDict) string {

View File

@@ -25,7 +25,7 @@ func (self *GuestStartTask) OnInit(ctx context.Context, obj db.IStandaloneModel,
}
func (self *GuestStartTask) checkTemplate(ctx context.Context, guest *models.SGuest) {
diskCat := guest.CategorizeDisks()
/*diskCat := guest.CategorizeDisks()
if diskCat.Root != nil && len(diskCat.Root.GetTemplateId()) > 0 {
if len(guest.BackupHostId) > 0 {
self.SetStage("OnMasterHostTemplateReady", nil)
@@ -35,7 +35,8 @@ func (self *GuestStartTask) checkTemplate(ctx context.Context, guest *models.SGu
guest.GetDriver().CheckDiskTemplateOnStorage(ctx, self.UserCred, diskCat.Root.GetTemplateId(), diskCat.Root.StorageId, self)
} else {
self.startStart(ctx, guest)
}
}*/
self.startStart(ctx, guest)
}
func (self *GuestStartTask) OnMasterHostTemplateReady(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {

View File

@@ -5,6 +5,7 @@ import (
"yunion.io/x/jsonutils"
"yunion.io/x/onecloud/pkg/appsrv"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
"yunion.io/x/onecloud/pkg/image/models"
@@ -15,7 +16,8 @@ type ImageCheckTask struct {
}
func init() {
taskman.RegisterTask(ImageCheckTask{})
checkWorker := appsrv.NewWorkerManager("ImageCheckTaskWorkerManager", 2, 1024, true)
taskman.RegisterTaskAndWorker(ImageCheckTask{}, checkWorker)
}
func (self *ImageCheckTask) OnInit(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {

View File

@@ -5,6 +5,7 @@ import (
"yunion.io/x/jsonutils"
"yunion.io/x/onecloud/pkg/appsrv"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
"yunion.io/x/onecloud/pkg/image/models"
@@ -15,7 +16,8 @@ type ImageConvertTask struct {
}
func init() {
taskman.RegisterTask(ImageConvertTask{})
convertWorker := appsrv.NewWorkerManager("ImageConvertTaskWorkerManager", 2, 512, true)
taskman.RegisterTaskAndWorker(ImageConvertTask{}, convertWorker)
}
func (self *ImageConvertTask) OnInit(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {