fix(worker): policy and task worker count configurable (#15716)

This commit is contained in:
wanyaoqi
2023-01-05 22:37:14 +08:00
committed by GitHub
parent cf238e7543
commit 58a20e4bf4
8 changed files with 43 additions and 7 deletions

View File

@@ -341,7 +341,7 @@ func init() {
}
R(&PolicyAdminCapableOptions{}, "policy-admin-capable", "Check admin capable", func(s *mcclient.ClientSession, args *PolicyAdminCapableOptions) error {
auth.InitFromClientSession(s)
policy.EnableGlobalRbac(15*time.Second, false)
policy.EnableGlobalRbac(15*time.Second, false, 1)
var token mcclient.TokenCredential
if len(args.User) > 0 {
@@ -395,7 +395,7 @@ func init() {
rbacutils.ShowMatchRuleDebug = true
}
auth.InitFromClientSession(s)
policy.EnableGlobalRbac(15*time.Second, false)
policy.EnableGlobalRbac(15*time.Second, false, 1)
if args.Debug {
consts.EnableRbacDebug()
}

View File

@@ -25,6 +25,7 @@ import (
"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"
)
const (
@@ -199,6 +200,17 @@ type sWorkerTask struct {
start time.Time
}
func (wm *SWorkerManager) UpdateWorkerCount(workerCount int) error {
wm.workerLock.Lock()
defer wm.workerLock.Unlock()
if wm.queue.Size() > 0 {
return errors.Errorf("worker queue is not empty")
}
wm.queue = NewRing(workerCount * wm.backlog)
wm.workerCount = workerCount
return nil
}
func (wm *SWorkerManager) String() string {
return wm.name
}

View File

@@ -96,6 +96,7 @@ func InitBaseAuth(options *common_options.BaseOptions) {
policy.EnableGlobalRbac(
time.Second*time.Duration(options.RbacPolicyRefreshIntervalSeconds),
options.RbacDebug,
options.PolicyWorkerCount,
)
}
consts.SetNonDefaultDomainProjects(options.NonDefaultDomainProjects)

View File

@@ -20,20 +20,33 @@ import (
"runtime/debug"
"yunion.io/x/jsonutils"
"yunion.io/x/log"
api "yunion.io/x/onecloud/pkg/apis/notify"
"yunion.io/x/onecloud/pkg/appsrv"
"yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
)
const (
DEFAULT_WORKER_COUNT = 4
)
var taskWorkMan *appsrv.SWorkerManager
var taskWorkerTable map[string]*appsrv.SWorkerManager
func init() {
taskWorkMan = appsrv.NewWorkerManager("TaskWorkerManager", 4, 1024, true)
taskWorkMan = appsrv.NewWorkerManager("TaskWorkerManager", DEFAULT_WORKER_COUNT, 1024, true)
taskWorkerTable = make(map[string]*appsrv.SWorkerManager)
}
func UpdateWorkerCount(workerCount int) error {
if workerCount != DEFAULT_WORKER_COUNT {
log.Infof("update task work count: %d", workerCount)
return taskWorkMan.UpdateWorkerCount(workerCount)
}
return nil
}
type taskTask struct {
taskId string
data jsonutils.JSONObject

View File

@@ -64,6 +64,7 @@ type BaseOptions struct {
ApplicationID string `help:"Application ID"`
RequestWorkerCount int `default:"8" help:"Request worker thread count, default is 8"`
TaskWorkerCount int `default:"4" help:"Task manager worker thread count, default is 4"`
EnableSsl bool `help:"Enable https"`
SslCaCerts string `help:"ssl certificate ca root file, separating ca and cert file is not encouraged" alias:"ca-file"`
@@ -77,6 +78,7 @@ type BaseOptions struct {
RbacDebug bool `help:"turn on rbac debug log" default:"false"`
RbacPolicyRefreshIntervalSeconds int `help:"policy refresh interval in seconds, default half a minute" default:"30"`
// RbacPolicySyncFailedRetrySeconds int `help:"seconds to wait after a failed sync, default 30 seconds" default:"30"`
PolicyWorkerCount int `help:"Policy worker count" default:"1"`
ConfigSyncPeriodSeconds int `help:"service config sync interval in seconds, default 30 minutes" default:"1800"`

View File

@@ -20,12 +20,12 @@ import (
"yunion.io/x/onecloud/pkg/cloudcommon/consts"
)
func EnableGlobalRbac(refreshInterval time.Duration, debug bool) {
func EnableGlobalRbac(refreshInterval time.Duration, debug bool, workerCount int) {
if !consts.IsRbacEnabled() {
consts.EnableRbac()
if debug {
consts.EnableRbacDebug()
}
PolicyManager.init(refreshInterval)
PolicyManager.init(refreshInterval, workerCount)
}
}

View File

@@ -94,7 +94,7 @@ func (data sPolicyData) getPolicy() (rbacutils.TPolicy, error) {
return rbacutils.DecodePolicyData(data.Policy)
}
func (manager *SPolicyManager) init(refreshInterval time.Duration) {
func (manager *SPolicyManager) init(refreshInterval time.Duration, workerCount int) {
manager.refreshInterval = refreshInterval
// manager.InitSync(manager)
if len(predefinedDefaultPolicies) > 0 {
@@ -125,7 +125,11 @@ func (manager *SPolicyManager) init(refreshInterval time.Duration) {
isDB = true
}
manager.fetchWorker = appsrv.NewWorkerManager("policyFetchWorker", 1, 2048, isDB)
if workerCount <= 0 {
workerCount = 1
}
log.Infof("policy fetch worker count %d", workerCount)
manager.fetchWorker = appsrv.NewWorkerManager("policyFetchWorker", workerCount, 2048, isDB)
}
func getMaskedLoginIp(userCred mcclient.TokenCredential) string {

View File

@@ -74,6 +74,10 @@ func StartService() {
}
log.Infof("serviceUrl: %s", serviceUrl)
taskman.SetServiceUrl(serviceUrl)
err = taskman.UpdateWorkerCount(opts.TaskWorkerCount)
if err != nil {
log.Fatalf("failed update task manager worker count %s", err)
}
err = esxi.InitEsxiConfig(opts.EsxiOptions)
if err != nil {