mirror of
https://github.com/yunionio/cloudpods.git
synced 2026-06-07 23:09:17 +08:00
修正:worker timeout之后需要从worker
manager中移除,不再占用worker的数量限额,否则timeout之后,并不能解决整个服务被卡住的问题
This commit is contained in:
@@ -22,7 +22,7 @@ import (
|
||||
type Application struct {
|
||||
name string
|
||||
context context.Context
|
||||
session *WorkerManager
|
||||
session *SWorkerManager
|
||||
roots map[string]*RadixNode
|
||||
rootLock *sync.Mutex
|
||||
connMax int
|
||||
@@ -42,7 +42,7 @@ const (
|
||||
DEFAULT_READ_TIMEOUT = 0
|
||||
DEFAULT_READ_HEADER_TIMEOUT = 10 * time.Second
|
||||
DEFAULT_WRITE_TIMEOUT = 0
|
||||
DEFAULT_PROCESS_TIMEOUT = 15 * time.Second
|
||||
DEFAULT_PROCESS_TIMEOUT = 15 * time.Millisecond
|
||||
)
|
||||
|
||||
func NewApplication(name string, connMax int) *Application {
|
||||
@@ -189,6 +189,7 @@ func (app *Application) defaultHandle(w http.ResponseWriter, r *http.Request, ri
|
||||
hand, ok := handler.(*handlerInfo)
|
||||
if ok {
|
||||
fw := newResponseWriterChannel(w)
|
||||
worker := make(chan *SWorker)
|
||||
errChan := make(chan interface{})
|
||||
ctx, cancel := context.WithTimeout(app.context, app.processTimeout)
|
||||
defer cancel()
|
||||
@@ -209,8 +210,8 @@ func (app *Application) defaultHandle(w http.ResponseWriter, r *http.Request, ri
|
||||
hand.handler(ctx, &fw, r)
|
||||
}()
|
||||
} // otherwise, the task has been timeout
|
||||
}, errChan)
|
||||
runErr := fw.wait(ctx, errChan)
|
||||
}, worker, errChan)
|
||||
runErr := fw.wait(ctx, worker, errChan)
|
||||
if runErr != nil {
|
||||
switch runErr.(type) {
|
||||
case *httputils.JSONClientError:
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"yunion.io/x/log"
|
||||
|
||||
"yunion.io/x/onecloud/pkg/httperrors"
|
||||
)
|
||||
|
||||
@@ -43,13 +45,19 @@ func (w *responseWriterChannel) WriteHeader(status int) {
|
||||
<-w.statusResp
|
||||
}
|
||||
|
||||
func (w *responseWriterChannel) wait(ctx context.Context, errChan chan interface{}) interface{} {
|
||||
func (w *responseWriterChannel) wait(ctx context.Context, workerChan chan *SWorker, errChan chan interface{}) interface{} {
|
||||
var err interface{}
|
||||
var worker *SWorker
|
||||
stop := false
|
||||
for !stop {
|
||||
select {
|
||||
case worker = <-workerChan:
|
||||
log.Infof("request is being handled by worker %s", worker)
|
||||
case <-ctx.Done():
|
||||
// ctx deadline reached, timeout
|
||||
if worker != nil {
|
||||
worker.Detach("timeout")
|
||||
}
|
||||
err = httperrors.NewTimeoutError("request process timeout")
|
||||
stop = true
|
||||
case e, more := <-errChan:
|
||||
|
||||
@@ -4,61 +4,164 @@ import (
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
|
||||
"container/list"
|
||||
"fmt"
|
||||
"yunion.io/x/log"
|
||||
)
|
||||
|
||||
type WorkerManager struct {
|
||||
name string
|
||||
queue *Ring
|
||||
workerCount int
|
||||
backlog int
|
||||
activeWorker int
|
||||
workerLock *sync.Mutex
|
||||
workerId uint64
|
||||
const (
|
||||
WORKER_STATE_ACTIVE = 0
|
||||
WORKER_STATE_DETACH = 1
|
||||
)
|
||||
|
||||
var isDebug = false
|
||||
|
||||
func enableDebug() {
|
||||
isDebug = true
|
||||
}
|
||||
|
||||
func NewWorkerManager(name string, workerCount int, backlog int) *WorkerManager {
|
||||
manager := WorkerManager{name: name,
|
||||
queue: NewRing(workerCount * backlog),
|
||||
workerCount: workerCount,
|
||||
backlog: backlog,
|
||||
activeWorker: 0,
|
||||
workerLock: &sync.Mutex{},
|
||||
workerId: 0}
|
||||
type SWorker struct {
|
||||
id uint64
|
||||
state int
|
||||
container *list.Element
|
||||
manager *SWorkerManager
|
||||
}
|
||||
|
||||
func newWorker(id uint64, manager *SWorkerManager) *SWorker {
|
||||
return &SWorker{
|
||||
id: id,
|
||||
state: WORKER_STATE_ACTIVE,
|
||||
container: nil,
|
||||
manager: manager,
|
||||
}
|
||||
}
|
||||
|
||||
func (worker *SWorker) isDetached() bool {
|
||||
worker.manager.workerLock.Lock()
|
||||
defer worker.manager.workerLock.Unlock()
|
||||
|
||||
return worker.state == WORKER_STATE_DETACH
|
||||
}
|
||||
|
||||
func (worker *SWorker) run() {
|
||||
for {
|
||||
if worker.isDetached() {
|
||||
if isDebug {
|
||||
log.Debugf("deteched worker %s, no need to pick up new job", worker)
|
||||
}
|
||||
break
|
||||
}
|
||||
req := worker.manager.queue.Pop()
|
||||
if req != nil {
|
||||
task := req.(*sWorkerTask)
|
||||
if task.worker != nil {
|
||||
task.worker <- worker
|
||||
}
|
||||
if isDebug {
|
||||
log.Debugf("start exec task on worker %s", worker)
|
||||
}
|
||||
execCallback(task)
|
||||
if isDebug {
|
||||
log.Debugf("end exec task on worker %s", worker)
|
||||
}
|
||||
} else {
|
||||
if isDebug {
|
||||
log.Debugf("no more job, exit worker %s", worker)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
worker.manager.removeWorker(worker)
|
||||
}
|
||||
|
||||
func (worker *SWorker) Detach(reason string) {
|
||||
worker.manager.workerLock.Lock()
|
||||
defer worker.manager.workerLock.Unlock()
|
||||
|
||||
worker.state = WORKER_STATE_DETACH
|
||||
worker.manager.activeWorker.removeWithLock(worker)
|
||||
worker.manager.detachedWorker.addWithLock(worker)
|
||||
|
||||
log.Warningf("detach worker %s due to reason %s", worker, reason)
|
||||
}
|
||||
|
||||
func (worker *SWorker) String() string {
|
||||
return fmt.Sprintf("#%d(%d)", worker.id, worker.state)
|
||||
}
|
||||
|
||||
type SWorkerList struct {
|
||||
list *list.List
|
||||
}
|
||||
|
||||
func newWorkerList() SWorkerList {
|
||||
return SWorkerList{
|
||||
list: list.New(),
|
||||
}
|
||||
}
|
||||
|
||||
func (wl *SWorkerList) addWithLock(worker *SWorker) {
|
||||
ele := wl.list.PushBack(worker)
|
||||
worker.container = ele
|
||||
}
|
||||
|
||||
func (wl *SWorkerList) removeWithLock(worker *SWorker) {
|
||||
wl.list.Remove(worker.container)
|
||||
worker.container = nil
|
||||
}
|
||||
|
||||
func (wl *SWorkerList) size() int {
|
||||
return wl.list.Len()
|
||||
}
|
||||
|
||||
type SWorkerManager struct {
|
||||
name string
|
||||
queue *Ring
|
||||
workerCount int
|
||||
backlog int
|
||||
activeWorker SWorkerList
|
||||
detachedWorker SWorkerList
|
||||
workerLock *sync.Mutex
|
||||
workerId uint64
|
||||
}
|
||||
|
||||
func NewWorkerManager(name string, workerCount int, backlog int) *SWorkerManager {
|
||||
manager := SWorkerManager{name: name,
|
||||
queue: NewRing(workerCount * backlog),
|
||||
workerCount: workerCount,
|
||||
backlog: backlog,
|
||||
activeWorker: newWorkerList(),
|
||||
detachedWorker: newWorkerList(),
|
||||
workerLock: &sync.Mutex{},
|
||||
workerId: 0}
|
||||
return &manager
|
||||
}
|
||||
|
||||
type workerTask struct {
|
||||
task func()
|
||||
err chan interface{}
|
||||
type sWorkerTask struct {
|
||||
task func()
|
||||
worker chan *SWorker
|
||||
err chan interface{}
|
||||
}
|
||||
|
||||
func (wm *WorkerManager) Run(task func(), err chan interface{}) bool {
|
||||
ret := wm.queue.Push(&workerTask{task: task, err: err})
|
||||
func (wm *SWorkerManager) Run(task func(), worker chan *SWorker, err chan interface{}) bool {
|
||||
ret := wm.queue.Push(&sWorkerTask{task: task, worker: worker, err: err})
|
||||
if ret {
|
||||
wm.schedule()
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (wm *WorkerManager) workerRun(id uint64) {
|
||||
//log.Println("Start worker", id)
|
||||
defer wm.decActiveWorker()
|
||||
req := wm.queue.Pop()
|
||||
for req != nil {
|
||||
wm.execCallback(req.(*workerTask))
|
||||
req = wm.queue.Pop()
|
||||
}
|
||||
//log.Println("End worker", id)
|
||||
}
|
||||
|
||||
func (wm *WorkerManager) decActiveWorker() {
|
||||
func (wm *SWorkerManager) removeWorker(worker *SWorker) {
|
||||
wm.workerLock.Lock()
|
||||
defer wm.workerLock.Unlock()
|
||||
wm.activeWorker -= 1
|
||||
|
||||
if worker.state == WORKER_STATE_ACTIVE {
|
||||
wm.activeWorker.removeWithLock(worker)
|
||||
} else {
|
||||
wm.detachedWorker.removeWithLock(worker)
|
||||
}
|
||||
}
|
||||
|
||||
func (wm *WorkerManager) execCallback(task *workerTask) {
|
||||
func execCallback(task *sWorkerTask) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Errorf("WorkerManager exec callback error: %s", r)
|
||||
@@ -75,16 +178,29 @@ func (wm *WorkerManager) execCallback(task *workerTask) {
|
||||
}
|
||||
}
|
||||
|
||||
func (wm *WorkerManager) schedule() {
|
||||
func (wm *SWorkerManager) schedule() {
|
||||
wm.workerLock.Lock()
|
||||
defer wm.workerLock.Unlock()
|
||||
if wm.activeWorker < wm.workerCount && wm.queue.Size() > 0 {
|
||||
wm.activeWorker += 1
|
||||
|
||||
if wm.activeWorker.size() < wm.workerCount && wm.queue.Size() > 0 {
|
||||
wm.workerId += 1
|
||||
go wm.workerRun(wm.workerId)
|
||||
worker := newWorker(wm.workerId, wm)
|
||||
wm.activeWorker.addWithLock(worker)
|
||||
if isDebug {
|
||||
log.Debugf("no enough worker, add new worker %s", worker)
|
||||
}
|
||||
go worker.run()
|
||||
}
|
||||
}
|
||||
|
||||
func (wm *SWorkerManager) ActiveWorkerCount() int {
|
||||
return wm.activeWorker.size()
|
||||
}
|
||||
|
||||
func (wm *SWorkerManager) DetachedWorkerCount() int {
|
||||
return wm.detachedWorker.size()
|
||||
}
|
||||
|
||||
func WaitChannel(ch chan interface{}) interface{} {
|
||||
var ret interface{}
|
||||
stop := false
|
||||
|
||||
@@ -6,22 +6,22 @@ import (
|
||||
)
|
||||
|
||||
func TestWorkerManager(t *testing.T) {
|
||||
enableDebug()
|
||||
startTime := time.Now()
|
||||
end := make(chan int)
|
||||
// end := make(chan int)
|
||||
wm := NewWorkerManager("testwm", 2, 10)
|
||||
counter := 0
|
||||
for i := 0; i < 10; i += 1 {
|
||||
wm.Run(func() {
|
||||
counter += 1
|
||||
time.Sleep(1 * time.Second)
|
||||
if counter >= i {
|
||||
end <- 1
|
||||
}
|
||||
}, nil)
|
||||
}, nil, nil)
|
||||
}
|
||||
for wm.ActiveWorkerCount() != 0 {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
<-end
|
||||
if time.Since(startTime) < 5*time.Second {
|
||||
t.Error("Increct timing")
|
||||
t.Error("Incorrect timing")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ func TestWorkerManagerError(t *testing.T) {
|
||||
err := make(chan interface{})
|
||||
wm.Run(func() {
|
||||
panic("Panic inside worker")
|
||||
}, err)
|
||||
}, nil, err)
|
||||
e := WaitChannel(err)
|
||||
if e == nil {
|
||||
t.Error("Panic not captured")
|
||||
@@ -38,9 +38,10 @@ func TestWorkerManagerError(t *testing.T) {
|
||||
err = make(chan interface{})
|
||||
wm.Run(func() {
|
||||
time.Sleep(1 * time.Second)
|
||||
}, err)
|
||||
}, nil, err)
|
||||
e = WaitChannel(err)
|
||||
if e != nil {
|
||||
t.Error("Should no error")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"yunion.io/x/onecloud/pkg/cloudcommon/db"
|
||||
)
|
||||
|
||||
var taskWorkMan *appsrv.WorkerManager
|
||||
var taskWorkMan *appsrv.SWorkerManager
|
||||
|
||||
func init() {
|
||||
taskWorkMan = appsrv.NewWorkerManager("TaskWorkerManager", 4, 100)
|
||||
@@ -22,5 +22,5 @@ func AddTaskHandler(prefix string, app *appsrv.Application) {
|
||||
func runTask(taskId string, data jsonutils.JSONObject) {
|
||||
taskWorkMan.Run(func() {
|
||||
TaskManager.execTask(taskId, data)
|
||||
}, nil)
|
||||
}, nil, nil)
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"yunion.io/x/onecloud/pkg/appsrv"
|
||||
)
|
||||
|
||||
var localTaskWorkerMan *appsrv.WorkerManager
|
||||
var localTaskWorkerMan *appsrv.SWorkerManager
|
||||
|
||||
func init() {
|
||||
localTaskWorkerMan = appsrv.NewWorkerManager("LocalTaskWorkerManager", 4, 10)
|
||||
@@ -42,5 +42,5 @@ func LocalTaskRun(task ITask, proc func() (jsonutils.JSONObject, error)) {
|
||||
task.ScheduleRun(data)
|
||||
}
|
||||
|
||||
}, nil)
|
||||
}, nil, nil)
|
||||
}
|
||||
|
||||
@@ -60,7 +60,7 @@ const (
|
||||
// golang 不支持 const 的string array, http://t.cn/EzAvbw8
|
||||
var BLACK_LIST_OBJ_TYPE = []string{"parameter"}
|
||||
|
||||
var logclientWorkerMan *appsrv.WorkerManager
|
||||
var logclientWorkerMan *appsrv.SWorkerManager
|
||||
|
||||
func init() {
|
||||
logclientWorkerMan = appsrv.NewWorkerManager("LogClientWorkerManager", 1, 50)
|
||||
@@ -119,5 +119,5 @@ func AddActionLog(model IObject, action string, iNotes interface{}, userCred mcc
|
||||
if err != nil {
|
||||
log.Errorf("create action log failed %s", err)
|
||||
}
|
||||
}, nil)
|
||||
}, nil, nil)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user