From 4d1d4e56fd55f8e10a14206feff6ef93e7ca8d15 Mon Sep 17 00:00:00 2001 From: Qiu Jian Date: Mon, 22 Oct 2018 01:38:52 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=EF=BC=9Aworker=20timeout?= =?UTF-8?q?=E4=B9=8B=E5=90=8E=E9=9C=80=E8=A6=81=E4=BB=8Eworker=20manager?= =?UTF-8?q?=E4=B8=AD=E7=A7=BB=E9=99=A4=EF=BC=8C=E4=B8=8D=E5=86=8D=E5=8D=A0?= =?UTF-8?q?=E7=94=A8worker=E7=9A=84=E6=95=B0=E9=87=8F=E9=99=90=E9=A2=9D?= =?UTF-8?q?=EF=BC=8C=E5=90=A6=E5=88=99timeout=E4=B9=8B=E5=90=8E=EF=BC=8C?= =?UTF-8?q?=E5=B9=B6=E4=B8=8D=E8=83=BD=E8=A7=A3=E5=86=B3=E6=95=B4=E4=B8=AA?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E8=A2=AB=E5=8D=A1=E4=BD=8F=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/appsrv/appsrv.go | 9 +- pkg/appsrv/response.go | 10 +- pkg/appsrv/workers.go | 194 ++++++++++++++---- pkg/appsrv/workers_test.go | 19 +- pkg/cloudcommon/db/taskman/handler.go | 4 +- pkg/cloudcommon/db/taskman/localtaskworker.go | 4 +- pkg/util/logclient/logclient.go | 4 +- 7 files changed, 185 insertions(+), 59 deletions(-) diff --git a/pkg/appsrv/appsrv.go b/pkg/appsrv/appsrv.go index f434bd44b4..c31936ceb7 100644 --- a/pkg/appsrv/appsrv.go +++ b/pkg/appsrv/appsrv.go @@ -22,7 +22,7 @@ import ( type Application struct { name string context context.Context - session *WorkerManager + session *SWorkerManager roots map[string]*RadixNode rootLock *sync.Mutex connMax int @@ -42,7 +42,7 @@ const ( DEFAULT_READ_TIMEOUT = 0 DEFAULT_READ_HEADER_TIMEOUT = 10 * time.Second DEFAULT_WRITE_TIMEOUT = 0 - DEFAULT_PROCESS_TIMEOUT = 15 * time.Second + DEFAULT_PROCESS_TIMEOUT = 15 * time.Millisecond ) func NewApplication(name string, connMax int) *Application { @@ -189,6 +189,7 @@ func (app *Application) defaultHandle(w http.ResponseWriter, r *http.Request, ri hand, ok := handler.(*handlerInfo) if ok { fw := newResponseWriterChannel(w) + worker := make(chan *SWorker) errChan := make(chan interface{}) ctx, cancel := context.WithTimeout(app.context, app.processTimeout) defer cancel() @@ -209,8 +210,8 @@ func (app *Application) defaultHandle(w http.ResponseWriter, r *http.Request, ri hand.handler(ctx, &fw, r) }() } // otherwise, the task has been timeout - }, errChan) - runErr := fw.wait(ctx, errChan) + }, worker, errChan) + runErr := fw.wait(ctx, worker, errChan) if runErr != nil { switch runErr.(type) { case *httputils.JSONClientError: diff --git a/pkg/appsrv/response.go b/pkg/appsrv/response.go index c2a4263700..6bf18eea4c 100644 --- a/pkg/appsrv/response.go +++ b/pkg/appsrv/response.go @@ -4,6 +4,8 @@ import ( "context" "net/http" + "yunion.io/x/log" + "yunion.io/x/onecloud/pkg/httperrors" ) @@ -43,13 +45,19 @@ func (w *responseWriterChannel) WriteHeader(status int) { <-w.statusResp } -func (w *responseWriterChannel) wait(ctx context.Context, errChan chan interface{}) interface{} { +func (w *responseWriterChannel) wait(ctx context.Context, workerChan chan *SWorker, errChan chan interface{}) interface{} { var err interface{} + var worker *SWorker stop := false for !stop { select { + case worker = <-workerChan: + log.Infof("request is being handled by worker %s", worker) case <-ctx.Done(): // ctx deadline reached, timeout + if worker != nil { + worker.Detach("timeout") + } err = httperrors.NewTimeoutError("request process timeout") stop = true case e, more := <-errChan: diff --git a/pkg/appsrv/workers.go b/pkg/appsrv/workers.go index 675853be3d..bffed2f678 100644 --- a/pkg/appsrv/workers.go +++ b/pkg/appsrv/workers.go @@ -4,61 +4,164 @@ import ( "runtime/debug" "sync" + "container/list" + "fmt" "yunion.io/x/log" ) -type WorkerManager struct { - name string - queue *Ring - workerCount int - backlog int - activeWorker int - workerLock *sync.Mutex - workerId uint64 +const ( + WORKER_STATE_ACTIVE = 0 + WORKER_STATE_DETACH = 1 +) + +var isDebug = false + +func enableDebug() { + isDebug = true } -func NewWorkerManager(name string, workerCount int, backlog int) *WorkerManager { - manager := WorkerManager{name: name, - queue: NewRing(workerCount * backlog), - workerCount: workerCount, - backlog: backlog, - activeWorker: 0, - workerLock: &sync.Mutex{}, - workerId: 0} +type SWorker struct { + id uint64 + state int + container *list.Element + manager *SWorkerManager +} + +func newWorker(id uint64, manager *SWorkerManager) *SWorker { + return &SWorker{ + id: id, + state: WORKER_STATE_ACTIVE, + container: nil, + manager: manager, + } +} + +func (worker *SWorker) isDetached() bool { + worker.manager.workerLock.Lock() + defer worker.manager.workerLock.Unlock() + + return worker.state == WORKER_STATE_DETACH +} + +func (worker *SWorker) run() { + for { + if worker.isDetached() { + if isDebug { + log.Debugf("deteched worker %s, no need to pick up new job", worker) + } + break + } + req := worker.manager.queue.Pop() + if req != nil { + task := req.(*sWorkerTask) + if task.worker != nil { + task.worker <- worker + } + if isDebug { + log.Debugf("start exec task on worker %s", worker) + } + execCallback(task) + if isDebug { + log.Debugf("end exec task on worker %s", worker) + } + } else { + if isDebug { + log.Debugf("no more job, exit worker %s", worker) + } + break + } + } + worker.manager.removeWorker(worker) +} + +func (worker *SWorker) Detach(reason string) { + worker.manager.workerLock.Lock() + defer worker.manager.workerLock.Unlock() + + worker.state = WORKER_STATE_DETACH + worker.manager.activeWorker.removeWithLock(worker) + worker.manager.detachedWorker.addWithLock(worker) + + log.Warningf("detach worker %s due to reason %s", worker, reason) +} + +func (worker *SWorker) String() string { + return fmt.Sprintf("#%d(%d)", worker.id, worker.state) +} + +type SWorkerList struct { + list *list.List +} + +func newWorkerList() SWorkerList { + return SWorkerList{ + list: list.New(), + } +} + +func (wl *SWorkerList) addWithLock(worker *SWorker) { + ele := wl.list.PushBack(worker) + worker.container = ele +} + +func (wl *SWorkerList) removeWithLock(worker *SWorker) { + wl.list.Remove(worker.container) + worker.container = nil +} + +func (wl *SWorkerList) size() int { + return wl.list.Len() +} + +type SWorkerManager struct { + name string + queue *Ring + workerCount int + backlog int + activeWorker SWorkerList + detachedWorker SWorkerList + workerLock *sync.Mutex + workerId uint64 +} + +func NewWorkerManager(name string, workerCount int, backlog int) *SWorkerManager { + manager := SWorkerManager{name: name, + queue: NewRing(workerCount * backlog), + workerCount: workerCount, + backlog: backlog, + activeWorker: newWorkerList(), + detachedWorker: newWorkerList(), + workerLock: &sync.Mutex{}, + workerId: 0} return &manager } -type workerTask struct { - task func() - err chan interface{} +type sWorkerTask struct { + task func() + worker chan *SWorker + err chan interface{} } -func (wm *WorkerManager) Run(task func(), err chan interface{}) bool { - ret := wm.queue.Push(&workerTask{task: task, err: err}) +func (wm *SWorkerManager) Run(task func(), worker chan *SWorker, err chan interface{}) bool { + ret := wm.queue.Push(&sWorkerTask{task: task, worker: worker, err: err}) if ret { wm.schedule() } return ret } -func (wm *WorkerManager) workerRun(id uint64) { - //log.Println("Start worker", id) - defer wm.decActiveWorker() - req := wm.queue.Pop() - for req != nil { - wm.execCallback(req.(*workerTask)) - req = wm.queue.Pop() - } - //log.Println("End worker", id) -} - -func (wm *WorkerManager) decActiveWorker() { +func (wm *SWorkerManager) removeWorker(worker *SWorker) { wm.workerLock.Lock() defer wm.workerLock.Unlock() - wm.activeWorker -= 1 + + if worker.state == WORKER_STATE_ACTIVE { + wm.activeWorker.removeWithLock(worker) + } else { + wm.detachedWorker.removeWithLock(worker) + } } -func (wm *WorkerManager) execCallback(task *workerTask) { +func execCallback(task *sWorkerTask) { defer func() { if r := recover(); r != nil { log.Errorf("WorkerManager exec callback error: %s", r) @@ -75,16 +178,29 @@ func (wm *WorkerManager) execCallback(task *workerTask) { } } -func (wm *WorkerManager) schedule() { +func (wm *SWorkerManager) schedule() { wm.workerLock.Lock() defer wm.workerLock.Unlock() - if wm.activeWorker < wm.workerCount && wm.queue.Size() > 0 { - wm.activeWorker += 1 + + if wm.activeWorker.size() < wm.workerCount && wm.queue.Size() > 0 { wm.workerId += 1 - go wm.workerRun(wm.workerId) + worker := newWorker(wm.workerId, wm) + wm.activeWorker.addWithLock(worker) + if isDebug { + log.Debugf("no enough worker, add new worker %s", worker) + } + go worker.run() } } +func (wm *SWorkerManager) ActiveWorkerCount() int { + return wm.activeWorker.size() +} + +func (wm *SWorkerManager) DetachedWorkerCount() int { + return wm.detachedWorker.size() +} + func WaitChannel(ch chan interface{}) interface{} { var ret interface{} stop := false diff --git a/pkg/appsrv/workers_test.go b/pkg/appsrv/workers_test.go index 69f4877b9f..bc1527ab37 100644 --- a/pkg/appsrv/workers_test.go +++ b/pkg/appsrv/workers_test.go @@ -6,22 +6,22 @@ import ( ) func TestWorkerManager(t *testing.T) { + enableDebug() startTime := time.Now() - end := make(chan int) + // end := make(chan int) wm := NewWorkerManager("testwm", 2, 10) counter := 0 for i := 0; i < 10; i += 1 { wm.Run(func() { counter += 1 time.Sleep(1 * time.Second) - if counter >= i { - end <- 1 - } - }, nil) + }, nil, nil) + } + for wm.ActiveWorkerCount() != 0 { + time.Sleep(time.Second) } - <-end if time.Since(startTime) < 5*time.Second { - t.Error("Increct timing") + t.Error("Incorrect timing") } } @@ -30,7 +30,7 @@ func TestWorkerManagerError(t *testing.T) { err := make(chan interface{}) wm.Run(func() { panic("Panic inside worker") - }, err) + }, nil, err) e := WaitChannel(err) if e == nil { t.Error("Panic not captured") @@ -38,9 +38,10 @@ func TestWorkerManagerError(t *testing.T) { err = make(chan interface{}) wm.Run(func() { time.Sleep(1 * time.Second) - }, err) + }, nil, err) e = WaitChannel(err) if e != nil { t.Error("Should no error") } + } diff --git a/pkg/cloudcommon/db/taskman/handler.go b/pkg/cloudcommon/db/taskman/handler.go index beb9de938b..23dc6dff15 100644 --- a/pkg/cloudcommon/db/taskman/handler.go +++ b/pkg/cloudcommon/db/taskman/handler.go @@ -8,7 +8,7 @@ import ( "yunion.io/x/onecloud/pkg/cloudcommon/db" ) -var taskWorkMan *appsrv.WorkerManager +var taskWorkMan *appsrv.SWorkerManager func init() { taskWorkMan = appsrv.NewWorkerManager("TaskWorkerManager", 4, 100) @@ -22,5 +22,5 @@ func AddTaskHandler(prefix string, app *appsrv.Application) { func runTask(taskId string, data jsonutils.JSONObject) { taskWorkMan.Run(func() { TaskManager.execTask(taskId, data) - }, nil) + }, nil, nil) } diff --git a/pkg/cloudcommon/db/taskman/localtaskworker.go b/pkg/cloudcommon/db/taskman/localtaskworker.go index f60e8b146f..57eaf00fb5 100644 --- a/pkg/cloudcommon/db/taskman/localtaskworker.go +++ b/pkg/cloudcommon/db/taskman/localtaskworker.go @@ -9,7 +9,7 @@ import ( "yunion.io/x/onecloud/pkg/appsrv" ) -var localTaskWorkerMan *appsrv.WorkerManager +var localTaskWorkerMan *appsrv.SWorkerManager func init() { localTaskWorkerMan = appsrv.NewWorkerManager("LocalTaskWorkerManager", 4, 10) @@ -42,5 +42,5 @@ func LocalTaskRun(task ITask, proc func() (jsonutils.JSONObject, error)) { task.ScheduleRun(data) } - }, nil) + }, nil, nil) } diff --git a/pkg/util/logclient/logclient.go b/pkg/util/logclient/logclient.go index 0d8a541788..176d55f9b9 100644 --- a/pkg/util/logclient/logclient.go +++ b/pkg/util/logclient/logclient.go @@ -60,7 +60,7 @@ const ( // golang 不支持 const 的string array, http://t.cn/EzAvbw8 var BLACK_LIST_OBJ_TYPE = []string{"parameter"} -var logclientWorkerMan *appsrv.WorkerManager +var logclientWorkerMan *appsrv.SWorkerManager func init() { logclientWorkerMan = appsrv.NewWorkerManager("LogClientWorkerManager", 1, 50) @@ -119,5 +119,5 @@ func AddActionLog(model IObject, action string, iNotes interface{}, userCred mcc if err != nil { log.Errorf("create action log failed %s", err) } - }, nil) + }, nil, nil) }