mirror of
https://github.com/yunionio/cloudpods.git
synced 2026-05-06 13:42:10 +08:00
876 lines
18 KiB
Go
876 lines
18 KiB
Go
/*
|
|
* Copyright (c) 2000-2018, 达梦数据库有限公司.
|
|
* All rights reserved.
|
|
*/
|
|
package dm
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"database/sql/driver"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"gitee.com/chunanyong/dm/parser"
|
|
|
|
"gitee.com/chunanyong/dm/util"
|
|
"golang.org/x/text/encoding"
|
|
)
|
|
|
|
type DmConnection struct {
|
|
filterable
|
|
mu sync.Mutex
|
|
|
|
dmConnector *DmConnector
|
|
Access *dm_build_1345
|
|
stmtMap map[int32]*DmStatement
|
|
|
|
lastExecInfo *execRetInfo
|
|
lexer *parser.Lexer
|
|
encode encoding.Encoding
|
|
encodeBuffer *bytes.Buffer
|
|
transformReaderDst []byte
|
|
transformReaderSrc []byte
|
|
|
|
serverEncoding string
|
|
GlobalServerSeries int
|
|
ServerVersion string
|
|
Malini2 bool
|
|
Execute2 bool
|
|
LobEmptyCompOrcl bool
|
|
IsoLevel int32
|
|
ReadOnly bool
|
|
NewLobFlag bool
|
|
sslEncrypt int
|
|
MaxRowSize int32
|
|
DDLAutoCommit bool
|
|
BackslashEscape bool
|
|
SvrStat int32
|
|
SvrMode int32
|
|
ConstParaOpt bool
|
|
DbTimezone int16
|
|
LifeTimeRemainder int16
|
|
InstanceName string
|
|
Schema string
|
|
LastLoginIP string
|
|
LastLoginTime string
|
|
FailedAttempts int32
|
|
LoginWarningID int32
|
|
GraceTimeRemainder int32
|
|
Guid string
|
|
DbName string
|
|
StandbyHost string
|
|
StandbyPort int32
|
|
StandbyCount int32
|
|
SessionID int64
|
|
OracleDateLanguage byte
|
|
FormatDate string
|
|
FormatTimestamp string
|
|
FormatTimestampTZ string
|
|
FormatTime string
|
|
FormatTimeTZ string
|
|
Local bool
|
|
MsgVersion int32
|
|
TrxStatus int32
|
|
dscControl bool
|
|
trxFinish bool
|
|
autoCommit bool
|
|
isBatch bool
|
|
|
|
watching bool
|
|
watcher chan<- context.Context
|
|
closech chan struct{}
|
|
finished chan<- struct{}
|
|
canceled atomicError
|
|
closed atomicBool
|
|
}
|
|
|
|
func (conn *DmConnection) setTrxFinish(status int32) {
|
|
switch status & Dm_build_132 {
|
|
case Dm_build_129, Dm_build_130, Dm_build_131:
|
|
conn.trxFinish = true
|
|
default:
|
|
conn.trxFinish = false
|
|
}
|
|
}
|
|
|
|
func (dmConn *DmConnection) init() {
|
|
|
|
dmConn.stmtMap = make(map[int32]*DmStatement)
|
|
dmConn.DbTimezone = 0
|
|
dmConn.GlobalServerSeries = 0
|
|
dmConn.MaxRowSize = 0
|
|
dmConn.LobEmptyCompOrcl = false
|
|
dmConn.ReadOnly = false
|
|
dmConn.DDLAutoCommit = false
|
|
dmConn.ConstParaOpt = false
|
|
dmConn.IsoLevel = -1
|
|
dmConn.Malini2 = true
|
|
dmConn.NewLobFlag = true
|
|
dmConn.Execute2 = true
|
|
dmConn.serverEncoding = ENCODING_GB18030
|
|
dmConn.TrxStatus = Dm_build_80
|
|
dmConn.setTrxFinish(dmConn.TrxStatus)
|
|
dmConn.OracleDateLanguage = byte(Locale)
|
|
dmConn.lastExecInfo = NewExceInfo()
|
|
dmConn.MsgVersion = Dm_build_13
|
|
|
|
dmConn.idGenerator = dmConnIDGenerator
|
|
}
|
|
|
|
func (dmConn *DmConnection) reset() {
|
|
dmConn.DbTimezone = 0
|
|
dmConn.GlobalServerSeries = 0
|
|
dmConn.MaxRowSize = 0
|
|
dmConn.LobEmptyCompOrcl = false
|
|
dmConn.ReadOnly = false
|
|
dmConn.DDLAutoCommit = false
|
|
dmConn.ConstParaOpt = false
|
|
dmConn.IsoLevel = -1
|
|
dmConn.Malini2 = true
|
|
dmConn.NewLobFlag = true
|
|
dmConn.Execute2 = true
|
|
dmConn.serverEncoding = ENCODING_GB18030
|
|
dmConn.TrxStatus = Dm_build_80
|
|
dmConn.setTrxFinish(dmConn.TrxStatus)
|
|
}
|
|
|
|
func (dc *DmConnection) checkClosed() error {
|
|
if dc.closed.IsSet() {
|
|
return driver.ErrBadConn
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (dc *DmConnection) executeInner(query string, execType int16) (interface{}, error) {
|
|
|
|
stmt, err := NewDmStmt(dc, query)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if execType == Dm_build_97 {
|
|
defer stmt.close()
|
|
}
|
|
|
|
stmt.innerUsed = true
|
|
if stmt.dmConn.dmConnector.escapeProcess {
|
|
stmt.nativeSql, err = stmt.dmConn.escape(stmt.nativeSql, stmt.dmConn.dmConnector.keyWords)
|
|
if err != nil {
|
|
stmt.close()
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
var optParamList []OptParameter
|
|
|
|
if stmt.dmConn.ConstParaOpt {
|
|
optParamList = make([]OptParameter, 0)
|
|
stmt.nativeSql, optParamList, err = stmt.dmConn.execOpt(stmt.nativeSql, optParamList, stmt.dmConn.getServerEncoding())
|
|
if err != nil {
|
|
stmt.close()
|
|
optParamList = nil
|
|
}
|
|
}
|
|
|
|
if execType == Dm_build_96 && dc.dmConnector.enRsCache {
|
|
rpv, err := rp.get(stmt, query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if rpv != nil {
|
|
stmt.execInfo = rpv.execInfo
|
|
dc.lastExecInfo = rpv.execInfo
|
|
return newDmRows(rpv.getResultSet(stmt)), nil
|
|
}
|
|
}
|
|
|
|
var info *execRetInfo
|
|
|
|
if optParamList != nil && len(optParamList) > 0 {
|
|
info, err = dc.Access.Dm_build_1428(stmt, optParamList)
|
|
if err != nil {
|
|
stmt.nativeSql = query
|
|
info, err = dc.Access.Dm_build_1434(stmt, execType)
|
|
}
|
|
} else {
|
|
info, err = dc.Access.Dm_build_1434(stmt, execType)
|
|
}
|
|
|
|
if err != nil {
|
|
stmt.close()
|
|
return nil, err
|
|
}
|
|
dc.lastExecInfo = info
|
|
|
|
if execType == Dm_build_96 && info.hasResultSet {
|
|
return newDmRows(newInnerRows(0, stmt, info)), nil
|
|
} else {
|
|
return newDmResult(stmt, info), nil
|
|
}
|
|
}
|
|
|
|
func g2dbIsoLevel(isoLevel int32) int32 {
|
|
switch isoLevel {
|
|
case 1:
|
|
return Dm_build_84
|
|
case 2:
|
|
return Dm_build_85
|
|
case 4:
|
|
return Dm_build_86
|
|
case 6:
|
|
return Dm_build_87
|
|
default:
|
|
return -1
|
|
}
|
|
}
|
|
|
|
func (dc *DmConnection) Begin() (driver.Tx, error) {
|
|
if len(dc.filterChain.filters) == 0 {
|
|
return dc.begin()
|
|
} else {
|
|
return dc.filterChain.reset().DmConnectionBegin(dc)
|
|
}
|
|
}
|
|
|
|
func (dc *DmConnection) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
|
|
if len(dc.filterChain.filters) == 0 {
|
|
return dc.beginTx(ctx, opts)
|
|
}
|
|
return dc.filterChain.reset().DmConnectionBeginTx(dc, ctx, opts)
|
|
}
|
|
|
|
func (dc *DmConnection) Commit() error {
|
|
if len(dc.filterChain.filters) == 0 {
|
|
return dc.commit()
|
|
} else {
|
|
return dc.filterChain.reset().DmConnectionCommit(dc)
|
|
}
|
|
}
|
|
|
|
func (dc *DmConnection) Rollback() error {
|
|
if len(dc.filterChain.filters) == 0 {
|
|
return dc.rollback()
|
|
} else {
|
|
return dc.filterChain.reset().DmConnectionRollback(dc)
|
|
}
|
|
}
|
|
|
|
func (dc *DmConnection) Close() error {
|
|
if len(dc.filterChain.filters) == 0 {
|
|
return dc.close()
|
|
} else {
|
|
return dc.filterChain.reset().DmConnectionClose(dc)
|
|
}
|
|
}
|
|
|
|
func (dc *DmConnection) Ping(ctx context.Context) error {
|
|
if len(dc.filterChain.filters) == 0 {
|
|
return dc.ping(ctx)
|
|
} else {
|
|
return dc.filterChain.reset().DmConnectionPing(dc, ctx)
|
|
}
|
|
}
|
|
|
|
func (dc *DmConnection) Exec(query string, args []driver.Value) (driver.Result, error) {
|
|
if len(dc.filterChain.filters) == 0 {
|
|
return dc.exec(query, args)
|
|
}
|
|
return dc.filterChain.reset().DmConnectionExec(dc, query, args)
|
|
}
|
|
|
|
func (dc *DmConnection) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
|
|
if len(dc.filterChain.filters) == 0 {
|
|
return dc.execContext(ctx, query, args)
|
|
}
|
|
return dc.filterChain.reset().DmConnectionExecContext(dc, ctx, query, args)
|
|
}
|
|
|
|
func (dc *DmConnection) Query(query string, args []driver.Value) (driver.Rows, error) {
|
|
if len(dc.filterChain.filters) == 0 {
|
|
return dc.query(query, args)
|
|
}
|
|
return dc.filterChain.reset().DmConnectionQuery(dc, query, args)
|
|
}
|
|
|
|
func (dc *DmConnection) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
|
|
if len(dc.filterChain.filters) == 0 {
|
|
return dc.queryContext(ctx, query, args)
|
|
}
|
|
return dc.filterChain.reset().DmConnectionQueryContext(dc, ctx, query, args)
|
|
}
|
|
|
|
func (dc *DmConnection) Prepare(query string) (driver.Stmt, error) {
|
|
if len(dc.filterChain.filters) == 0 {
|
|
return dc.prepare(query)
|
|
}
|
|
return dc.filterChain.reset().DmConnectionPrepare(dc, query)
|
|
}
|
|
|
|
func (dc *DmConnection) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
|
|
if len(dc.filterChain.filters) == 0 {
|
|
return dc.prepareContext(ctx, query)
|
|
}
|
|
return dc.filterChain.reset().DmConnectionPrepareContext(dc, ctx, query)
|
|
}
|
|
|
|
func (dc *DmConnection) ResetSession(ctx context.Context) error {
|
|
if len(dc.filterChain.filters) == 0 {
|
|
return dc.resetSession(ctx)
|
|
}
|
|
if err := dc.filterChain.reset().DmConnectionResetSession(dc, ctx); err != nil {
|
|
return driver.ErrBadConn
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (dc *DmConnection) CheckNamedValue(nv *driver.NamedValue) error {
|
|
if len(dc.filterChain.filters) == 0 {
|
|
return dc.checkNamedValue(nv)
|
|
}
|
|
return dc.filterChain.reset().DmConnectionCheckNamedValue(dc, nv)
|
|
}
|
|
|
|
func (dc *DmConnection) begin() (*DmConnection, error) {
|
|
return dc.beginTx(context.Background(), driver.TxOptions{driver.IsolationLevel(sql.LevelDefault), false})
|
|
}
|
|
|
|
func (dc *DmConnection) beginTx(ctx context.Context, opts driver.TxOptions) (*DmConnection, error) {
|
|
if err := dc.watchCancel(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
defer dc.finish()
|
|
|
|
err := dc.checkClosed()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dc.autoCommit = false
|
|
|
|
if dc.ReadOnly != opts.ReadOnly {
|
|
dc.ReadOnly = opts.ReadOnly
|
|
var readonly = 0
|
|
if opts.ReadOnly {
|
|
readonly = 1
|
|
}
|
|
dc.exec(fmt.Sprintf("SP_SET_SESSION_READONLY(%d)", readonly), nil)
|
|
}
|
|
|
|
if dc.IsoLevel != int32(opts.Isolation) {
|
|
switch sql.IsolationLevel(opts.Isolation) {
|
|
case sql.LevelDefault:
|
|
dc.IsoLevel = int32(sql.LevelReadCommitted)
|
|
case sql.LevelReadUncommitted, sql.LevelReadCommitted, sql.LevelSerializable:
|
|
dc.IsoLevel = int32(opts.Isolation)
|
|
case sql.LevelRepeatableRead:
|
|
if dc.CompatibleMysql() {
|
|
dc.IsoLevel = int32(sql.LevelReadCommitted)
|
|
} else {
|
|
return nil, ECGO_INVALID_TRAN_ISOLATION.throw()
|
|
}
|
|
default:
|
|
return nil, ECGO_INVALID_TRAN_ISOLATION.throw()
|
|
}
|
|
|
|
err = dc.Access.Dm_build_1488(dc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return dc, nil
|
|
}
|
|
|
|
func (dc *DmConnection) commit() error {
|
|
err := dc.checkClosed()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
dc.autoCommit = dc.dmConnector.autoCommit
|
|
if dc.ReadOnly {
|
|
dc.exec("SP_SET_SESSION_READONLY(0)", nil)
|
|
}
|
|
}()
|
|
|
|
if !dc.autoCommit {
|
|
err = dc.Access.Commit()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dc.trxFinish = true
|
|
return nil
|
|
} else if !dc.dmConnector.alwayseAllowCommit {
|
|
return ECGO_COMMIT_IN_AUTOCOMMIT_MODE.throw()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (dc *DmConnection) rollback() error {
|
|
err := dc.checkClosed()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
dc.autoCommit = dc.dmConnector.autoCommit
|
|
if dc.ReadOnly {
|
|
dc.exec("SP_SET_SESSION_READONLY(0)", nil)
|
|
}
|
|
}()
|
|
|
|
if !dc.autoCommit {
|
|
err = dc.Access.Rollback()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dc.trxFinish = true
|
|
return nil
|
|
} else if !dc.dmConnector.alwayseAllowCommit {
|
|
return ECGO_ROLLBACK_IN_AUTOCOMMIT_MODE.throw()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (dc *DmConnection) reconnect() error {
|
|
err := dc.Access.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, stmt := range dc.stmtMap {
|
|
|
|
for id, rs := range stmt.rsMap {
|
|
rs.Close()
|
|
delete(stmt.rsMap, id)
|
|
}
|
|
}
|
|
|
|
var newConn *DmConnection
|
|
if dc.dmConnector.group != nil {
|
|
if newConn, err = dc.dmConnector.group.connect(dc.dmConnector); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
newConn, err = dc.dmConnector.connect(context.Background())
|
|
}
|
|
|
|
oldMap := dc.stmtMap
|
|
newConn.mu = dc.mu
|
|
newConn.filterable = dc.filterable
|
|
*dc = *newConn
|
|
|
|
for _, stmt := range oldMap {
|
|
if stmt.closed {
|
|
continue
|
|
}
|
|
err = dc.Access.Dm_build_1406(stmt)
|
|
if err != nil {
|
|
stmt.free()
|
|
continue
|
|
}
|
|
|
|
if stmt.prepared || stmt.paramCount > 0 {
|
|
if err = stmt.prepare(); err != nil {
|
|
continue
|
|
}
|
|
}
|
|
|
|
dc.stmtMap[stmt.id] = stmt
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (dc *DmConnection) cleanup() {
|
|
dc.close()
|
|
}
|
|
|
|
func (dc *DmConnection) close() error {
|
|
if !dc.closed.TrySet(true) {
|
|
return nil
|
|
}
|
|
|
|
util.AbsorbPanic(func() {
|
|
close(dc.closech)
|
|
})
|
|
if dc.Access == nil {
|
|
return nil
|
|
}
|
|
|
|
dc.rollback()
|
|
|
|
for _, stmt := range dc.stmtMap {
|
|
stmt.free()
|
|
}
|
|
|
|
dc.Access.Close()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (dc *DmConnection) ping(ctx context.Context) error {
|
|
if err := dc.watchCancel(ctx); err != nil {
|
|
return err
|
|
}
|
|
defer dc.finish()
|
|
|
|
rows, err := dc.query("select 1", nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return rows.close()
|
|
}
|
|
|
|
func (dc *DmConnection) exec(query string, args []driver.Value) (*DmResult, error) {
|
|
err := dc.checkClosed()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if args != nil && len(args) > 0 {
|
|
stmt, err := dc.prepare(query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer stmt.close()
|
|
dc.lastExecInfo = stmt.execInfo
|
|
|
|
return stmt.exec(args)
|
|
} else {
|
|
r1, err := dc.executeInner(query, Dm_build_97)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if r2, ok := r1.(*DmResult); ok {
|
|
return r2, nil
|
|
} else {
|
|
return nil, ECGO_NOT_EXEC_SQL.throw()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (dc *DmConnection) execContext(ctx context.Context, query string, args []driver.NamedValue) (*DmResult, error) {
|
|
if err := dc.watchCancel(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
defer dc.finish()
|
|
|
|
err := dc.checkClosed()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if args != nil && len(args) > 0 {
|
|
stmt, err := dc.prepare(query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer stmt.close()
|
|
dc.lastExecInfo = stmt.execInfo
|
|
dargs, err := namedValueToValue(stmt, args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return stmt.exec(dargs)
|
|
} else {
|
|
r1, err := dc.executeInner(query, Dm_build_97)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if r2, ok := r1.(*DmResult); ok {
|
|
return r2, nil
|
|
} else {
|
|
return nil, ECGO_NOT_EXEC_SQL.throw()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (dc *DmConnection) query(query string, args []driver.Value) (*DmRows, error) {
|
|
|
|
err := dc.checkClosed()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if args != nil && len(args) > 0 {
|
|
stmt, err := dc.prepare(query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
dc.lastExecInfo = stmt.execInfo
|
|
|
|
stmt.innerUsed = true
|
|
return stmt.query(args)
|
|
|
|
} else {
|
|
r1, err := dc.executeInner(query, Dm_build_96)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if r2, ok := r1.(*DmRows); ok {
|
|
return r2, nil
|
|
} else {
|
|
return nil, ECGO_NOT_QUERY_SQL.throw()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (dc *DmConnection) queryContext(ctx context.Context, query string, args []driver.NamedValue) (*DmRows, error) {
|
|
if err := dc.watchCancel(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
defer dc.finish()
|
|
|
|
err := dc.checkClosed()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if args != nil && len(args) > 0 {
|
|
stmt, err := dc.prepare(query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
dc.lastExecInfo = stmt.execInfo
|
|
|
|
stmt.innerUsed = true
|
|
dargs, err := namedValueToValue(stmt, args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return stmt.query(dargs)
|
|
|
|
} else {
|
|
r1, err := dc.executeInner(query, Dm_build_96)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if r2, ok := r1.(*DmRows); ok {
|
|
return r2, nil
|
|
} else {
|
|
return nil, ECGO_NOT_QUERY_SQL.throw()
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func (dc *DmConnection) prepare(query string) (stmt *DmStatement, err error) {
|
|
if err = dc.checkClosed(); err != nil {
|
|
return
|
|
}
|
|
if stmt, err = NewDmStmt(dc, query); err != nil {
|
|
return
|
|
}
|
|
if err = stmt.prepare(); err != nil {
|
|
stmt.close()
|
|
stmt = nil
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
func (dc *DmConnection) prepareContext(ctx context.Context, query string) (*DmStatement, error) {
|
|
if err := dc.watchCancel(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
defer dc.finish()
|
|
|
|
return dc.prepare(query)
|
|
}
|
|
|
|
func (dc *DmConnection) resetSession(ctx context.Context) error {
|
|
if err := dc.watchCancel(ctx); err != nil {
|
|
return err
|
|
}
|
|
defer dc.finish()
|
|
|
|
err := dc.checkClosed()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (dc *DmConnection) checkNamedValue(nv *driver.NamedValue) error {
|
|
var err error
|
|
var cvt = converter{dc, false}
|
|
nv.Value, err = cvt.ConvertValue(nv.Value)
|
|
dc.isBatch = cvt.isBatch
|
|
return err
|
|
}
|
|
|
|
func (dc *DmConnection) driverQuery(query string) (*DmStatement, *DmRows, error) {
|
|
stmt, err := NewDmStmt(dc, query)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
stmt.innerUsed = true
|
|
stmt.innerExec = true
|
|
info, err := dc.Access.Dm_build_1434(stmt, Dm_build_96)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
dc.lastExecInfo = info
|
|
stmt.innerExec = false
|
|
return stmt, newDmRows(newInnerRows(0, stmt, info)), nil
|
|
}
|
|
|
|
func (dc *DmConnection) getIndexOnEPGroup() int32 {
|
|
if dc.dmConnector.group == nil || dc.dmConnector.group.epList == nil {
|
|
return -1
|
|
}
|
|
for i := 0; i < len(dc.dmConnector.group.epList); i++ {
|
|
ep := dc.dmConnector.group.epList[i]
|
|
if dc.dmConnector.host == ep.host && dc.dmConnector.port == ep.port {
|
|
return int32(i)
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
func (dc *DmConnection) getServerEncoding() string {
|
|
if dc.dmConnector.charCode != "" {
|
|
return dc.dmConnector.charCode
|
|
}
|
|
return dc.serverEncoding
|
|
}
|
|
|
|
func (dc *DmConnection) lobFetchAll() bool {
|
|
return dc.dmConnector.lobMode == 2
|
|
}
|
|
|
|
func (conn *DmConnection) CompatibleOracle() bool {
|
|
return conn.dmConnector.compatibleMode == COMPATIBLE_MODE_ORACLE
|
|
}
|
|
|
|
func (conn *DmConnection) CompatibleMysql() bool {
|
|
return conn.dmConnector.compatibleMode == COMPATIBLE_MODE_MYSQL
|
|
}
|
|
|
|
func (conn *DmConnection) cancel(err error) {
|
|
conn.canceled.Set(err)
|
|
conn.close()
|
|
|
|
}
|
|
|
|
func (conn *DmConnection) finish() {
|
|
if !conn.watching || conn.finished == nil {
|
|
return
|
|
}
|
|
select {
|
|
case conn.finished <- struct{}{}:
|
|
conn.watching = false
|
|
case <-conn.closech:
|
|
}
|
|
}
|
|
|
|
func (conn *DmConnection) startWatcher() {
|
|
watcher := make(chan context.Context, 1)
|
|
conn.watcher = watcher
|
|
finished := make(chan struct{})
|
|
conn.finished = finished
|
|
go func() {
|
|
for {
|
|
var ctx context.Context
|
|
select {
|
|
case ctx = <-watcher:
|
|
case <-conn.closech:
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
conn.cancel(ctx.Err())
|
|
case <-finished:
|
|
case <-conn.closech:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (conn *DmConnection) watchCancel(ctx context.Context) error {
|
|
if conn.watching {
|
|
|
|
conn.cleanup()
|
|
return nil
|
|
}
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if ctx.Done() == nil {
|
|
return nil
|
|
}
|
|
|
|
if conn.watcher == nil {
|
|
return nil
|
|
}
|
|
|
|
conn.watching = true
|
|
conn.watcher <- ctx
|
|
return nil
|
|
}
|
|
|
|
type noCopy struct{}
|
|
|
|
func (*noCopy) Lock() {}
|
|
|
|
type atomicBool struct {
|
|
_noCopy noCopy
|
|
value uint32
|
|
}
|
|
|
|
func (ab *atomicBool) IsSet() bool {
|
|
return atomic.LoadUint32(&ab.value) > 0
|
|
}
|
|
|
|
func (ab *atomicBool) Set(value bool) {
|
|
if value {
|
|
atomic.StoreUint32(&ab.value, 1)
|
|
} else {
|
|
atomic.StoreUint32(&ab.value, 0)
|
|
}
|
|
}
|
|
|
|
func (ab *atomicBool) TrySet(value bool) bool {
|
|
if value {
|
|
return atomic.SwapUint32(&ab.value, 1) == 0
|
|
}
|
|
return atomic.SwapUint32(&ab.value, 0) > 0
|
|
}
|
|
|
|
type atomicError struct {
|
|
_noCopy noCopy
|
|
value atomic.Value
|
|
}
|
|
|
|
func (ae *atomicError) Set(value error) {
|
|
ae.value.Store(value)
|
|
}
|
|
|
|
func (ae *atomicError) Value() error {
|
|
if v := ae.value.Load(); v != nil {
|
|
|
|
return v.(error)
|
|
}
|
|
return nil
|
|
}
|