mirror of
https://github.com/yunionio/cloudpods.git
synced 2026-05-07 14:11:05 +08:00
218 lines
6.4 KiB
Go
218 lines
6.4 KiB
Go
// Copyright 2019 Yunion
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package cloudcommon
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/mattn/go-sqlite3"
|
|
|
|
"yunion.io/x/jsonutils"
|
|
"yunion.io/x/log"
|
|
"yunion.io/x/pkg/errors"
|
|
"yunion.io/x/sqlchemy"
|
|
_ "yunion.io/x/sqlchemy/backends"
|
|
|
|
noapi "yunion.io/x/onecloud/pkg/apis/notify"
|
|
"yunion.io/x/onecloud/pkg/cloudcommon/consts"
|
|
"yunion.io/x/onecloud/pkg/cloudcommon/db"
|
|
"yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
|
|
"yunion.io/x/onecloud/pkg/cloudcommon/etcd"
|
|
"yunion.io/x/onecloud/pkg/cloudcommon/informer"
|
|
"yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
|
|
common_options "yunion.io/x/onecloud/pkg/cloudcommon/options"
|
|
"yunion.io/x/onecloud/pkg/util/dbutils"
|
|
)
|
|
|
|
func InitDBConn(options *common_options.DBOptions) {
|
|
dialect, sqlStr, err := options.GetDBConnection()
|
|
if err != nil {
|
|
log.Fatalf("Invalid SqlConnection string: %s error: %v", options.SqlConnection, err)
|
|
}
|
|
backend := sqlchemy.MySQLBackend
|
|
switch dialect {
|
|
case "dm":
|
|
backend = sqlchemy.DamengBackend
|
|
dialect = "dm"
|
|
sqlStr = "dm://" + sqlStr
|
|
case "sqlite3":
|
|
backend = sqlchemy.SQLiteBackend
|
|
dialect = "sqlite3_with_extensions"
|
|
sql.Register(dialect,
|
|
&sqlite3.SQLiteDriver{
|
|
Extensions: []string{
|
|
"/opt/yunion/share/sqlite/inet",
|
|
},
|
|
},
|
|
)
|
|
case "clickhouse":
|
|
log.Fatalf("cannot use clickhouse as primary database")
|
|
}
|
|
log.Infof("database dialect: %s sqlStr: %s", dialect, sqlStr)
|
|
// save configuration to consts
|
|
consts.SetDefaultDB(dialect, sqlStr)
|
|
dbConn, err := sql.Open(dialect, sqlStr)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
sqlchemy.SetDBWithNameBackend(dbConn, sqlchemy.DefaultDB, backend)
|
|
|
|
if options.DbMaxWaitTimeoutSeconds <= 300 {
|
|
options.DbMaxWaitTimeoutSeconds = 3600
|
|
}
|
|
// ConnMaxLifetime is the maximum amount of time a connection may be reused.
|
|
// mysql default max_waitimeout is 28800 seconds, 1 hour should be enough
|
|
// but if user set a customized mysql max_waittimeout, the value should be adjusted accordingly
|
|
dbConn.SetConnMaxLifetime(time.Duration(options.DbMaxWaitTimeoutSeconds) * time.Second)
|
|
// ConnMaxIdleTime should be half of ConnMaxLifetime
|
|
dbConn.SetConnMaxIdleTime(time.Duration(options.DbMaxWaitTimeoutSeconds/2) * time.Second)
|
|
}
|
|
|
|
func InitClickhouseConn(options *common_options.DBOptions) {
|
|
dialect, sqlStr, err := options.GetClickhouseConnStr()
|
|
if err == nil {
|
|
// connect to clickcloud
|
|
// force convert sqlstr from clickhouse v2 to v1
|
|
sqlStr, err = dbutils.ClickhouseSqlStrV2ToV1(sqlStr)
|
|
if err != nil {
|
|
log.Fatalf("fail to convert clickhouse sqlstr from v2 to v1: %s", err)
|
|
}
|
|
err = dbutils.ValidateClickhouseV1Str(sqlStr)
|
|
if err != nil {
|
|
log.Fatalf("invalid clickhouse sqlstr: %s", err)
|
|
}
|
|
click, err := sql.Open(dialect, sqlStr)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
sqlchemy.SetDBWithNameBackend(click, db.ClickhouseDB, sqlchemy.ClickhouseBackend)
|
|
|
|
if options.OpsLogWithClickhouse {
|
|
consts.OpsLogWithClickhouse = true
|
|
}
|
|
}
|
|
}
|
|
|
|
func InitDB(options *common_options.DBOptions) {
|
|
if options.DebugSqlchemy {
|
|
log.Warningf("debug Sqlchemy is turned on")
|
|
sqlchemy.DEBUG_SQLCHEMY = true
|
|
}
|
|
|
|
log.Infof("Registered SQL drivers: %s", strings.Join(sql.Drivers(), ", "))
|
|
|
|
consts.QueryOffsetOptimization = options.QueryOffsetOptimization
|
|
|
|
if options.HistoricalUniqueName {
|
|
consts.EnableHistoricalUniqueName()
|
|
} else {
|
|
consts.DisableHistoricalUniqueName()
|
|
}
|
|
|
|
if options.OpsLogMaxKeepMonths > 0 {
|
|
consts.SetSplitableMaxKeepMonths(options.OpsLogMaxKeepMonths)
|
|
}
|
|
if options.SplitableMaxDurationHours > 0 {
|
|
consts.SetSplitableMaxDurationHours(options.SplitableMaxDurationHours)
|
|
}
|
|
|
|
InitDBConn(options)
|
|
InitClickhouseConn(options)
|
|
|
|
switch options.LockmanMethod {
|
|
case common_options.LockMethodInMemory, "":
|
|
log.Infof("using inmemory lockman")
|
|
lm := lockman.NewInMemoryLockManager()
|
|
lockman.Init(lm)
|
|
case common_options.LockMethodEtcd:
|
|
log.Infof("using etcd lockman")
|
|
tlsCfg, err := options.GetEtcdTLSConfig()
|
|
if err != nil {
|
|
log.Fatalln(err.Error())
|
|
}
|
|
lm, err := lockman.NewEtcdLockManager(&lockman.SEtcdLockManagerConfig{
|
|
Endpoints: options.EtcdEndpoints,
|
|
Username: options.EtcdUsername,
|
|
Password: options.EtcdPassword,
|
|
LockTTL: options.EtcdLockTTL,
|
|
LockPrefix: options.EtcdLockPrefix,
|
|
TLS: tlsCfg,
|
|
})
|
|
if err != nil {
|
|
log.Fatalf("etcd lockman: %v", err)
|
|
}
|
|
lockman.Init(lm)
|
|
}
|
|
// lm := lockman.NewNoopLockManager()
|
|
|
|
if options.EnableDBChecksumTables && len(options.DBChecksumHashAlgorithm) > 0 {
|
|
consts.SetDefaultDBChecksumHashAlgorithm(options.DBChecksumHashAlgorithm)
|
|
}
|
|
|
|
initDBNotifier()
|
|
startInitInformer(options)
|
|
}
|
|
|
|
func initDBNotifier() {
|
|
db.SetChecksumTestFailedNotifier(func(obj *jsonutils.JSONDict) {
|
|
notifyclient.SystemExceptionNotifyWithResult(context.TODO(), noapi.ActionChecksumTest, noapi.TOPIC_RESOURCE_DB_TABLE_RECORD, noapi.ResultFailed, obj)
|
|
})
|
|
}
|
|
|
|
// startInitInformer starts goroutine init informer backend
|
|
func startInitInformer(options *common_options.DBOptions) {
|
|
go func() {
|
|
if len(options.EtcdEndpoints) == 0 {
|
|
return
|
|
}
|
|
for {
|
|
log.Infof("using etcd as resource informer backend")
|
|
if err := initInformer(options); err != nil {
|
|
log.Errorf("Init informer error: %v", err)
|
|
time.Sleep(10 * time.Second)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func initInformer(options *common_options.DBOptions) error {
|
|
tlsCfg, err := options.GetEtcdTLSConfig()
|
|
if err != nil {
|
|
return errors.Wrap(err, "get etcd informer backend tls config")
|
|
}
|
|
informerBackend, err := informer.NewEtcdBackend(&etcd.SEtcdOptions{
|
|
EtcdEndpoint: options.EtcdEndpoints,
|
|
EtcdTimeoutSeconds: 5,
|
|
EtcdRequestTimeoutSeconds: 2,
|
|
EtcdLeaseExpireSeconds: 5,
|
|
EtcdEnabldSsl: options.EtcdUseTLS,
|
|
TLSConfig: tlsCfg,
|
|
}, nil)
|
|
if err != nil {
|
|
return errors.Wrap(err, "new etcd informer backend")
|
|
}
|
|
informer.Init(informerBackend)
|
|
return nil
|
|
}
|
|
|
|
func CloseDB() {
|
|
sqlchemy.CloseDB()
|
|
}
|